[
https://issues.apache.org/jira/browse/AIRFLOW-932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891251#comment-15891251
]
Dan Davydov edited comment on AIRFLOW-932 at 3/1/17 11:01 PM:
--------------------------------------------------------------
Command:
{quote}airflow backfill teztg -t create_and_transfer_roster -s 2017-06-14 -e
2017-06-14{quote}
DAG:
{code}
import airflow
from airflow.executors import SequentialExecutor
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.models import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'start_date': datetime(2016, 3, 15),
}
dag = DAG('teztg', default_args=default_args)
subdag = DAG('teztg.create_and_transfer_roster', default_args=default_args)
subdag_operator = SubDagOperator(task_id='create_and_transfer_roster',
subdag=subdag,
executor=SequentialExecutor(),
dag=dag)
def create_roster(ds, macros, **kwargs):
pass
PythonOperator(
task_id='create_roster',
python_callable=create_roster,
dag=subdag,
provide_context=True)
first_task = DummyOperator(task_id="first_task", dag=dag)
{code}
Expected:
first_task TI does not change (stays as none)
create_and_transfer_roster TI is run
Actual:
first_task TI gets its state set to removed
create_and_transfer_roster TI is not run, backfill errors with
airflow.exceptions.AirflowException: Task first_task not found
[~bolke] if you think these are related to your changes (addition of
task.REMOVED state/backfill changes) do you mind taking a look? Otherwise let
me know and I can do so.
was (Author: aoen):
Command:
{quote}airflow backfill teztg -t create_and_transfer_roster -s 2017-06-14 -e
2017-06-14{quote}
DAG:
{quote}
import airflow
from airflow.executors import SequentialExecutor
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.models import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'start_date': datetime(2016, 3, 15),
}
dag = DAG('teztg', default_args=default_args)
subdag = DAG('teztg.create_and_transfer_roster', default_args=default_args)
subdag_operator = SubDagOperator(task_id='create_and_transfer_roster',
subdag=subdag,
executor=SequentialExecutor(),
dag=dag)
def create_roster(ds, macros, **kwargs):
pass
PythonOperator(
task_id='create_roster',
python_callable=create_roster,
dag=subdag,
provide_context=True)
first_task = DummyOperator(task_id="first_task", dag=dag)
{quote}
Expected:
first_task TI does not change (stays as none)
create_and_transfer_roster TI is run
Actual:
first_task TI gets its state set to removed
create_and_transfer_roster TI is not run, backfill errors with
airflow.exceptions.AirflowException: Task first_task not found
[~bolke] if you think these are related to your changes (addition of
task.REMOVED state/backfill changes) do you mind taking a look? Otherwise let
me know and I can do so.
> Backfills delete existing task instances and mark them as removed
> -----------------------------------------------------------------
>
> Key: AIRFLOW-932
> URL: https://issues.apache.org/jira/browse/AIRFLOW-932
> Project: Apache Airflow
> Issue Type: Sub-task
> Components: backfill
> Reporter: Dan Davydov
> Priority: Blocker
>
> I'm still investigating.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)