[ 
https://issues.apache.org/jira/browse/AIRFLOW-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074147#comment-17074147
 ] 

ASF GitHub Bot commented on AIRFLOW-4355:
-----------------------------------------

tooptoop4 commented on pull request #8082: [AIRFLOW-4355] removed task should 
not lead to dagrun success
URL: https://github.com/apache/airflow/pull/8082
 
 
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Externally triggered DAG is marked as 'success' even if a task has been 
> 'removed'!
> ----------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-4355
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4355
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG, DagRun, scheduler
>    Affects Versions: 1.10.3
>            Reporter: t oo
>            Assignee: t oo
>            Priority: Blocker
>              Labels: dynamic
>             Fix For: 2.0.0
>
>         Attachments: dag_success_even_if_task_removed.png, treeview.png
>
>
> note: all my dags are purely externally triggered
> *Issue:* Dag has 5 parallel tasks that ran successfully and 1 final task that 
> somehow got 'removed' state (prior dag runs had 'failed' state) and never ran 
> successfully but still the DAG is showing success!
>  
> *Command ran* (note that previous commands like airflow trigger_dag -e 
> 20190412 qsr_coremytbl were run before and failed for valid reason (ie python 
> task failing) ):
> airflow trigger_dag -e 20190412T08:00 qsr_coremytbl --conf '\{"hourstr":"08"}'
>  
> *some logs on prior instance of airflow (ec2 was autohealed):*
> [2019-04-18 08:29:40,678] \{logging_mixin.py:95} INFO - [2019-04-18 
> 08:29:40,678] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: 
> qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [None]>' 
> for dag '<DAG: qsr_coremytbl>'. Marking it as removed.
>  [2019-04-18 08:29:43,582] \{logging_mixin.py:95} INFO - [2019-04-18 
> 08:29:43,582] {__init__.py:4906} INFO - Restoring task '<TaskInstance: 
> qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>' 
> which was previously removed from DAG '<DAG: qsr_coremytbl>'
>  [2019-04-18 08:29:43,618] \{jobs.py:1787} INFO - Creating / updating 
> <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 
> 08:00:00+00:00 [scheduled]> in ORM
>  [2019-04-18 08:29:43,676] \{logging_mixin.py:95} INFO - [2019-04-18 
> 08:29:43,676] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: 
> qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 
> [scheduled]>' for dag '<DAG: qsr_coremytbl>'. Marking it as removed.
>  
> *some logs on newer ec2:*
> [myuser@host logs]$ grep -i hive -R * | sed 's#[0-9]#x#g' | sort | uniq -c | 
> grep -v 'airflow-webserver-access.log'
>  2 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl 
> REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx 
> [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', 
> u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl 
> REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx 
> [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', 
> u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl'), ('format', 
> u'json')]
>  1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl 
> REPAIR_HIVE_schemeh.mytbl rendered xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx 
> [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', 
> u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl 
> REPAIR_HIVE_schemeh.mytbl task xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx 
> [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', 
> u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] 
> \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: 
> qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx 
> [scheduled]> in ORM
>  71 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] 
> \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} 
> INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 
> xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG 
> '<DAG: qsr_coremytbl>'
>  1 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] 
> \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: 
> qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx 
> [scheduled]> in ORM
>  71 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] 
> \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} 
> INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 
> xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG 
> '<DAG: qsr_coremytbl>'
>  
> mysql> *select * from task_instance where task_id like '%REP%';#*
>  
> +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
> |task_id|dag_id|execution_date|start_date|end_date|duration|state|try_number|hostname|unixname|job_id|pool|queue|priority_weight|operator|queued_dttm|pid|max_tries|executor_config|
> +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
> |REPAIR_HIVE_mytbl|qsr_coremytbl|2019-04-13 00:00:00.000000|2019-04-17 
> 20:29:19.639059|2019-04-17 
> 20:29:33.700252|14.0612|failed|1|ip-ec2|myuser|305|NULL|default|1|PythonOperator|2019-04-17
>  20:29:13.604354|899|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-01 
> 00:00:00.000000|2019-04-17 21:30:11.439127|2019-04-17 
> 21:30:11.439142|NULL|upstream_failed|0| 
> |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-02 
> 00:00:00.000000|2019-04-17 21:46:34.163607|2019-04-17 
> 21:46:34.163627|NULL|upstream_failed|0| 
> |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 
> 00:00:00.000000|2019-04-17 21:50:48.541224|2019-04-17 
> 21:50:48.541239|NULL|upstream_failed|0| 
> |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 
> 05:00:00.000000|2019-04-17 22:00:24.286685|2019-04-17 
> 22:00:24.286709|NULL|upstream_failed|0| 
> |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 
> 00:00:00.000000|2019-04-17 21:26:08.621737|2019-04-17 
> 21:26:22.686882|14.0651|failed|1|ip-ec2|myuser|316|NULL|default|1|PythonOperator|2019-04-17
>  21:26:03.083885|29638|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 
> 17:00:00.000000|2019-04-17 22:44:52.900860|2019-04-17 
> 22:45:04.403179|11.5023|failed|1|ip-ec2|myuser|348|NULL|default|1|PythonOperator|2019-04-17
>  22:44:47.895363|10815|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 
> 22:00:00.000000|2019-04-17 22:37:28.409799|2019-04-17 
> 22:37:41.448494|13.0387|failed|1|ip-ec2|myuser|342|NULL|default|1|PythonOperator|2019-04-17
>  22:37:23.449554|28697|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 
> 00:00:00.000000|2019-04-17 21:02:36.365150|2019-04-17 
> 21:02:36.365165|NULL|upstream_failed|0| 
> |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 
> 07:00:00.000000|2019-04-18 08:24:59.552695|2019-04-18 
> 08:24:59.552710|NULL|removed|0| 
> |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 
> 08:00:00.000000|NULL|NULL|NULL|removed|0| 
> |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
>  11 rows in set (0.00 sec)
>  
>  
> *Task Instance Details*
>  Dependencies Blocking Task From Getting Scheduled
>  Dependency Reason
>  Task Instance State Task is in the 'removed' state which is not a valid 
> state for execution. The task must be cleared in order to be run.
>  Dagrun Running Task instance's dagrun was not in the 'running' state but in 
> the state 'success'.
>  Attribute: python_callable
> def repair_hive_table(hive_cli_conn_id, schema, table, 
> drop_partitions_first=False):
> conn = BaseHook.get_connection(hive_cli_conn_id)
>  ssl_options = conn.extra_dejson.get('ssl-options')
>  jdbc_url = 
> "jdbc:hive2://\{conn.host}:\{conn.port}/\{conn.schema};\{ssl_options}".format(**locals())
> sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table)
>  hive_command = ' '.join([
>  '/home/myuser/spark_home/bin/beeline',
>  '-u', '"%s"' % jdbc_url,
>  '-n', conn.login,
>  '-w', '/home/myuser/spark_home/hivepw',
>  '-e', '"%s"' % sqlstmt ])
>  # note - do not log the command which contains truststore and hive user 
> passwords
>  logging.info("Executing following SQL statement: %s" % sqlstmt)
> process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, 
> stderr=subprocess.STDOUT, shell=True)
>  (output, error) = process.communicate()
>  logging.info(output)
>  if process.returncode != 0:
>  logging.error('Hive process returned code %d' % process.returncode)
>  raise Exception('Hive process returned code %d' % process.returncode)
>  Task Instance Attributes
>  Attribute Value
>  dag_id qsr_coremytbl
>  duration None
>  end_date None
>  execution_date 2019-04-12T08:00:00+00:00
>  executor_config {}
>  generate_command <function generate_command at 0x7f6526f97ed8>
>  hostname 
>  is_premature False
>  job_id None
>  key ('qsr_coremytbl', u'REPAIR_HIVE_schemeh.mytbl', <Pendulum 
> [2019-04-12T08:00:00+00:00]>, 1)
>  log <logging.Logger object at 0x7f65267a2d50>
>  log_filepath 
> /home/myuser/airflow/logs/qsr_coremytbl/REPAIR_HIVE_schemeh.mytbl/2019-04-12T08:00:00+00:00.log
>  log_url 
> [https://domain/admin/airflow/log?dag_id=qsr_coremytbl&task_id=REPAIR_HIVE_schemeh.mytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00]
>  logger <logging.Logger object at 0x7f65267a2d50>
>  mark_success_url 
> [https://domain/admin/airflow/success?task_id=REPAIR_HIVE_schemeh.mytbl&dag_id=qsr_coremytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00&upstream=false&downstream=false]
>  max_tries 0
>  metadata MetaData(bind=None)
>  next_try_number 1
>  operator None
>  pid None
>  pool None
>  previous_ti None
>  priority_weight 1
>  queue default
>  queued_dttm None
>  raw False
>  run_as_user None
>  start_date None
>  state removed
>  task <Task(PythonOperator): REPAIR_HIVE_schemeh.mytbl>
>  task_id REPAIR_HIVE_schemeh.mytbl
>  test_mode False
>  try_number 1
>  unixname myuser
>  Task Attributes
>  Attribute Value
>  dag <DAG: qsr_coremytbl>
>  dag_id qsr_coremytbl
>  depends_on_past False
>  deps set([<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, 
> <TIDep(Previous Dagrun State)>])
>  downstream_list []
>  downstream_task_ids set([])
>  email None
>  email_on_failure True
>  email_on_retry True
>  end_date None
>  execution_timeout None
>  executor_config {}
>  inlets []
>  lineage_data None
>  log <logging.Logger object at 0x7f65167edad0>
>  logger <logging.Logger object at 0x7f65167edad0>
>  max_retry_delay None
>  on_failure_callback None
>  on_retry_callback None
>  on_success_callback None
>  op_args []
>  op_kwargs \{'table': u'mytbl', 'hive_cli_conn_id': 'query_hive', 'schema': 
> u'schemeh'}
>  outlets []
>  owner airflow
>  params {}
>  pool None
>  priority_weight 1
>  priority_weight_total 1
>  provide_context False
>  queue default
>  resources {'disk':
> {'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}
> , 'gpus': \{'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': 
> \{'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': \{'_qty': 1, 
> '_units_str': 'core(s)', '_name': 'CPU'}}
>  retries 0
>  retry_delay 0:05:00
>  retry_exponential_backoff False
>  run_as_user None
>  schedule_interval None
>  shallow_copy_attrs ('python_callable', 'op_kwargs')
>  sla None
>  start_date 2017-06-01T00:00:00+00:00
>  task_concurrency None
>  task_id REPAIR_HIVE_schemeh.mytbl
>  task_type PythonOperator
>  template_ext []
>  template_fields ('templates_dict', 'op_args', 'op_kwargs')
>  templates_dict None
>  trigger_rule all_success
>  ui_color #ffefeb
>  ui_fgcolor #000
>  upstream_list [<Task(SparkSubmitOperator): coremytbl_FR__H>, 
> <Task(SparkSubmitOperator): coremytbl_BE__H>, <Task(SparkSubmitOperator): 
> coremytbl_NL__H>, <Task(SparkSubmitOperator): coremytbl_DE__H>, 
> <Task(SparkSubmitOperator): coremytbl_DAGNAME__H>]
>  upstream_task_ids set([u'coremytbl_FR__H', u'coremytbl_BE__H', 
> u'coremytbl_NL__H', u'coremytbl_DE__H', u'coremytbl_DAGNAME__H'])
>  wait_for_downstream False
>  weight_rule downstream
>  
> DAG code:
> {noformat}
>  
> import datetime as dt
> import glob
> import json
> import logging
> import os
> import subprocess
> #import urllib
> from airflow import DAG
> from airflow.contrib.operators.spark_submit_operator import 
> SparkSubmitOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.hooks.base_hook import BaseHook
> # Globals
> DATALAKE_S3ROOT = subprocess.check_output(". /etc/pipeline/profile; dig -t 
> TXT +short qsrs3.$pipelineBranch.sss.$networkDomainName | tail -1 | tr -d 
> '\"' | sed -e s#^s3://##", shell=True).strip()
> default_args = {
>     'owner': 'airflow',
>     'start_date': dt.datetime(2017, 6, 1),
>     'retries': 0,
>     'retry_delay': dt.timedelta(minutes=5),
> }
> def repair_hive_table(hive_cli_conn_id, schema, table, 
> drop_partitions_first=False):
>     
>     conn = BaseHook.get_connection(hive_cli_conn_id)
>     ssl_options = conn.extra_dejson.get('ssl-options')
>     jdbc_url = 
> "jdbc:hive2://{conn.host}:{conn.port}/{conn.schema};{ssl_options}".format(**locals())
>     
>     sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table)
>     hive_command = ' '.join([
>     '/home/myuser/spark_home/bin/beeline',
>     '-u', '"%s"' % jdbc_url,
>     '-n', conn.login,
>     '-w', '/home/myuser/spark_home/hivepw',
>     '-e', '"%s"' % sqlstmt ])
>     # note - do not log the command which contains truststore and hive user 
> passwords
>     logging.info("Executing following SQL statement: %s" % sqlstmt)
>     
>     process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, 
> stderr=subprocess.STDOUT, shell=True)
>     (output, error) = process.communicate()
>     logging.info(output)
>     if process.returncode != 0:
>         logging.error('Hive process returned code %d' % process.returncode)
>         raise Exception('Hive process returned code %d' % process.returncode)
>         
>         
>     
> def create_spark_submit_operator(dag, pool, runstream, iz_strip_name, table, 
> filename):
>     task_id = "%s_%s" % (runstream, iz_strip_name)
>     #need to handle validation and import
>     filename = iz_strip_name + '_input.yaml'
>     file_path = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, 
> 'jar/input', filename)
>     return SparkSubmitOperator(
>             dag = dag, # need to tell airflow that this task belongs to the 
> dag we defined above
>             task_id = task_id,
>             pool = pool,
>             #params={"lob": lob}, # the sql file above have a template in it 
> for a 'lob' paramater - this is how we pass it in
>             #bash_command='echo "Data Load task 1 {{ params.lob }} here"'
>             conn_id='process',
>             java_class='com.cimp.jar.jar',
>             application='s3a://%s/jar.jar' % DATALAKE_S3ROOT,
>             #total_executor_cores='16',
>             executor_cores='2',
>             executor_memory='8g',
>             driver_memory='2g',
>             conf={"spark.sql.parquet.writeLegacyFormat" : "true"
>             ,"spark.executor.extraJavaOptions" : "-Dinput_wildcard={{ 
> ds_nodash }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' 
> }} -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if 
> dag_run.conf['hourstr'] else '' }}"
>             ,"spark.driver.extraJavaOptions" : "-Dinput_wildcard={{ ds_nodash 
> }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }} 
> -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if 
> dag_run.conf['hourstr'] else '' }}"
>             },
>             #num_executors='2',
>             name="%s_{{ ds_nodash }}" % task_id,
>             verbose=True,
>             #dont know how will work (where do the prior exports get made) or 
> if commandline way can be used
>             #env_vars={"input_wildcard": 
> os.environ['input_wildcard'],"datestr": os.environ['datestr'],"hourstr": 
> os.environ['hourstr']},
>             application_args=[
>             "--config", "%s" % file_path
>             ],
>         )
> def create_load_dag(dag_id, runstream):
>     dag = DAG(dag_id, # give the dag a name 
>            schedule_interval=None, # define how often you want it to run - 
> you can pass cron expressions here
>            default_args=default_args # pass the default args defined above or 
> you can override them here if you want this dag to behave a little different
>          )
>     
>     file_list = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, 
> 'file_list.json')
>     process = subprocess.Popen(["cat", file_list], stdout=subprocess.PIPE)
>     decoded_data = json.load(process.stdout)
>     
>     table_repair_dependencies = {} # (schema, table) -> [load tasks]
>     for file in decoded_data['files']:
>         srcfile = file['name']
>         iz_strip_name = file['iz_strip_name']
>         
>         schema = file['Schema']
>         hive_table = file['hive_table']
>         # One staging task
>         stagingTask = create_spark_submit_operator(dag, None, runstream, 
> iz_strip_name, srcfile, "QSRLOAD")
>         stagingTask.doc_md = """\
>         QSR jar LOAD for %s to %s.%s
>         """ % (srcfile,schema,hive_table)
>         try :
>             table_repair_dependencies[(schema, 
> hive_table)].append(stagingTask)
>         except KeyError:
>             table_repair_dependencies[(schema, hive_table)] = [stagingTask]
>             
>             
>     # table repair tasks
>     for (schema, object_name) in table_repair_dependencies.keys() :
>         repairHiveTask = PythonOperator(dag = dag, 
> task_id="REPAIR_HIVE_%s.%s" %  (schema,object_name),
>                      python_callable=repair_hive_table,
>                      op_kwargs={'hive_cli_conn_id' : 'query_hive',
>                      'schema' : schema,
>                      'table' : object_name})
>         repairHiveTask << table_repair_dependencies[(schema, object_name)]
>         
>     return dag
>     
>         
> # dynamically generate all dags
> for entry in os.listdir('/home/myuser/jsonconfigs/si/job'):
>     file_list = glob.glob(os.path.join('/home/myuser/jsonconfigs/si/job', 
> entry, 'file_list.json'))
>     if file_list :    
>         runstream = entry
>         dag_id = 'qsr_%s' % runstream
>         globals()[dag_id] = create_load_dag(dag_id, runstream)
> {noformat}
>  
> *after encountering this issue i ran*: airflow clear mycooldag
> below was the output....as you can see the REPAIR_HIVE task was never 
> successful so I don't know how the dag can be overall 'success' state!!!
>  
>  
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-01 00:00:00+00:00 
> [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-02 00:00:00+00:00 
> [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 00:00:00+00:00 
> [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 05:00:00+00:00 
> [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 00:00:00+00:00 
> [failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 17:00:00+00:00 
> [failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 22:00:00+00:00 
> [failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 00:00:00+00:00 
> [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 07:00:00+00:00 
> [removed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 
> [removed]>



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to