It is a new table named 'TaskExclusion'. The migration script for this is as follows:
def upgrade(): op.create_table( 'task_exclusion', sa.Column('id', sa.Integer(), nullable=False), sa.Column('dag_id', sa.String(length=250), nullable=False), sa.Column('task_id', sa.String(length=250), nullable=False), sa.Column('exclusion_type', sa.String(length=32), nullable=False), sa.Column('exclusion_start_date', sa.DateTime(), nullable=False), sa.Column('exclusion_end_date', sa.DateTime(), nullable=False), sa.Column('created_by', sa.String(length=256), nullable=False), sa.Column('created_on', sa.DateTime(), nullable=False), sa.PrimaryKeyConstraint('id')) def downgrade(): op.drop_table('task_exclusion') This is the PR for the exclusion of a task. We review our code internally before setting up a PR into the main repo for the next review, hence the PR being in our fork. The PR does not yet contain our unit tests. Cheers, Luke Maycock OLIVER WYMAN luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com> www.oliverwyman.com<http://www.oliverwyman.com/> ________________________________ From: Bolke de Bruin <bdbr...@gmail.com> Sent: 09 December 2016 20:54 To: dev@airflow.incubator.apache.org Subject: Re: Skip task What table was this? I recently pushed a fix that allows fractional seconds in our minimum supported version of MySQL (5.6.4 and beyond). I might have missed something. Thanks Bolke Sent from my iPhone > On 9 Dec 2016, at 14:27, Maycock, Luke > <luke.mayc...@affiliate.oliverwyman.com> wrote: > > I found the issue to be that, for MySQL, the datetime was being rounded to > the nearest second. The strange thing is that if a datetime without the > microseconds was passed to SQLAlchemy, the insertion into MySQL failed; but > when a datetime with microseconds was passed, the microseconds are removed by > rounding to the nearest second. > > > Hopefully, this will prevent someone else going down the same rabbit hole > that I did. > > > Cheers, > Luke Maycock > OLIVER WYMAN > luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com> > www.oliverwyman.com<http://www.oliverwyman.com/> > > > ________________________________ > From: Maycock, Luke <luke.mayc...@affiliate.oliverwyman.com> > Sent: 08 December 2016 10:44:32 > To: dev@airflow.incubator.apache.org > Subject: Re: Skip task > > Hi All, > > > We have implemented a solution for allowing the exclusion of individual tasks > during a DAG run. However, when writing unit tests for this, we are > encountering an issue with MySQL, which I am hoping someone is able to help > us with. > > > For our solution, we have a new 'TaskExclusion' table in the meta-data. Our > unit tests were run by Travis, not locally. > > > The code block under test: > > > class TaskExclusion(Base): > """ > This class is used to define objects that can be used to specify not to > run a given task in a given dag on a variety of execution date conditions. > These objects will be stored in the backend database in the task_exclusion > table. > Static methods are provided for the creation, removal and investigation of > these objects. > """ > > __tablename__ = "task_exclusion" > > id = Column(Integer(), primary_key=True) > dag_id = Column(String(ID_LEN), nullable=False) > task_id = Column(String(ID_LEN), nullable=False) > exclusion_type = Column(String(32), nullable=False) > exclusion_start_date = Column(DateTime, nullable=True) > exclusion_end_date = Column(DateTime, nullable=True) > created_by = Column(String(256), nullable=False) > created_on = Column(DateTime, nullable=False) > > @classmethod > @provide_session > def set( > cls, > dag_id, > task_id, > exclusion_type, > exclusion_start_date, > exclusion_end_date, > created_by, > session=None): > """ > Add a task exclusion to prevent a task running under certain > circumstances. > :param dag_id: The dag_id of the DAG containing the task to exclude > from execution. > :param task_id: The task_id of the task to exclude from execution. > :param exclusion_type: The type of circumstances to exclude the task > from execution under. See the TaskExclusionType class for more detail. > :param exclusion_start_date: The execution_date to start excluding on. > This will be ignored if the exclusion_type is INDEFINITE. > :param exclusion_end_date: The execution_date to stop excluding on. > This will be ignored if the exclusion_type is INDEFINITE or > SINGLE_DATE. > :param created_by: Who is creating this exclusion. Stored with the > exclusion record for auditing/debugging purposes. > :return: None. > """ > > session.expunge_all() > > # Set up execution date range correctly > if exclusion_type == TaskExclusionType.SINGLE_DATE: > if exclusion_start_date: > exclusion_end_date = exclusion_start_date > else: > raise AirflowException( > "No exclusion_start_date " > ) > elif exclusion_type == TaskExclusionType.DATE_RANGE: > if exclusion_start_date > exclusion_end_date: > raise AirflowException( > "The exclusion_start_date is after the exclusion_end_date" > ) > elif exclusion_type == TaskExclusionType.INDEFINITE: > exclusion_start_date = None > exclusion_end_date = None > else: > raise AirflowException( > "The exclusion_type, {}, is not recognised." > .format(exclusion_type) > ) > > # remove any duplicate exclusions > session.query(cls).filter( > cls.dag_id == dag_id, > cls.task_id == task_id, > cls.exclusion_type == exclusion_type, > cls.exclusion_start_date == exclusion_start_date, > cls.exclusion_end_date == exclusion_end_date > ).delete() > > # insert new exclusion > session.add(TaskExclusion( > dag_id=dag_id, > task_id=task_id, > exclusion_type=exclusion_type, > exclusion_start_date=exclusion_start_date, > exclusion_end_date=exclusion_end_date, > created_by=created_by, > created_on=datetime.now()) > ) > > session.commit() > > > The unit test: > > class TaskExclusionTest(unittest.TestCase): > def test_set_exclusion(self, session=None): > > session = settings.Session() > > session.expunge_all() > > dag_id = 'test_task_exclude' > task_id = 'test_task_exclude' > exec_date = datetime.datetime.now() > > TaskExclusion.set(dag_id=dag_id, > task_id=task_id, > exclusion_type=TaskExclusionType.SINGLE_DATE, > exclusion_start_date=exec_date, > exclusion_end_date=exec_date, > created_by='airflow') > > > exclusion = session.query(TaskExclusion).filter( > TaskExclusion.dag_id == dag_id, > TaskExclusion.task_id == task_id, > TaskExclusion.exclusion_type == > TaskExclusionType.SINGLE_DATE, > TaskExclusion.exclusion_start_date == exec_date, > TaskExclusion.exclusion_end_date == exec_date).first() > > self.assertTrue(exclusion) > > > The unit test passes for postgreSQL and SQLite but fails for MySQL. I have > checked and the 'exclusion' variable contains a TaskExclusion object for > postgreSQL and SQLite but is set to 'None' for MySQL. Any suggestions on what > could be causing this would be much appreciated. > > > Cheers, > Luke Maycock > OLIVER WYMAN > luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com> > www.oliverwyman.com<http://www.oliverwyman.com/> > > > > ________________________________ > From: siddharth anand <san...@apache.org> > Sent: 16 November 2016 00:40 > To: dev@airflow.incubator.apache.org > Subject: Re: Skip task > > If your requirement is to skip a portion of tasks in a DagRun based on some > state encountered while executing that DagRun, that is what > BranchPythonOperator or ShortCircruitOperator (optionally paired with a > Trigger Rule specified on a downstream task) is made for. > > These operators take a custom Python callable as a argument. The callable > can check for the existence of data or files that should have been > generated by an external system or an upstream task in the same DAG. The > callables need to return a Boolean value in the case of the > ShortCircruitOperator or a selected choice (i.e. branch to take) as in the > case of the BranchPythonOperator. > > If you have 20 tasks that all depend on the presence of 20 different files, > you would need 20 ShortCircruitOperator or BranchPythonOperator tasks each > either sharing a common callable or each with its own callable. > > One could argue that these tasks are "overhead" because they just encompass > some conditional or control logic and that DAGs should only contain > workhorse tasks (i.e. tasks that do some work). DAGs with workhorse-only > tasks are more of a pure dataflow approach -- i.e. no control-logic > operators. However, I don't see another option. > > In the current system, a callable registered with a ShortCircruitOperator > would check for the presence of a file -- if the file were not available, > then a series of downstream tasks would be skipped in that DAGRun, until a > task with a Trigger_Rule="all_done" were encountered, downstream of which, > tasks would no longer be skipped for the DagRun. > > I hope this makes sense. > > A long time ago, I proposed UI functionality to skip a series of DAG runs > via the UI, because I knew that no data was available for that time range > from an external system. It wanted to essentially specify a "blackout" > period in terms of a time range that covered multiple DagRuns. My intention > was for backfills to skip those days. It turns out that my company did not > end up having such a requirement, so I dropped the feature request. > > If this is what you are asking for, then I am +1. Please implement it and > submit a PR. > > On Tue, Nov 15, 2016 at 2:50 AM, Maycock, Luke < > luke.mayc...@affiliate.oliverwyman.com> wrote: > >> Thank you for taking the time to respond. This is a great approach if you >> know at the time of creating the DAG which tasks you expect to need to >> skip. However, I don't think this is exactly the use case I have. For >> example, I may be expecting a file to arrive in an FTP folder for loading >> into a database but one day it doesn't arrive so I just want to skip that >> task on that day. >> >> >> Our workflows commonly have around 20 of these types of tasks in. I could >> configure all of these tasks in the way you suggested in case I ever need >> to skip one of them. However, I'd prefer not to have to set the tasks up >> this way and instead have the ability just to skip a task on an ad-hoc >> basis. I could then also use this functionality to add the ability to run >> from a certain point in a DAG or to a certain point in the DAG. >> >> >> >> Thanks, >> Luke Maycock >> OLIVER WYMAN >> luke.mayc...@affiliate.oliverwyman.com<mailto:luke. >> mayc...@affiliate.oliverwyman.com> >> www.oliverwyman.com<http://www.oliverwyman.com/> >> >> >> >> ________________________________ >> From: siddharth anand <san...@apache.org> >> Sent: 14 November 2016 19:48 >> To: dev@airflow.incubator.apache.org >> Subject: Re: Skip task >> >> For cases like this, we (Agari) use the following approach : >> >> 1. Create a Variable in the UI of type boolean such as *enable_feature_x* >> 2. Use a ShortCircuitOperator (or BranchPythonOperator) to Skip >> downstream processing based on the value of *enable_feature_x* >> 3. Assuming that you don't want to skip ALL downstream tasks, you can >> use a trigger_rule of all_done to resume processing some portion of your >> downstream DAG after skipping an upstream portion >> >> In other words, there is already a means to achieve what you are asking for >> today. You can change the value of via *enable_feature_x *the UI. If you'd >> like to enhance the UI to better capture this pattern, pls submit a PR. >> -s >> >> On Thu, Nov 10, 2016 at 1:20 PM, Maycock, Luke < >> luke.mayc...@affiliate.oliverwyman.com> wrote: >> >>> Hi Gerard, >>> >>> >>> I see the new status as having a number of uses: >>> >>> 1. A user can manually set a task to skip in a DAG run via the UI. >>> 2. We can then make use of this new status to add the following >>> functionality to Airflow: >>> * Run a DAG run up to a certain point and have the rest of the >> tasks >>> have the new status. >>> * Run a DAG run from a certain task to the end, setting all >>> pre-requisite tasks to have this new status. >>> >>> I am happy to be challenged on the above use cases if there are better >>> ways to achieve the same things. >>> >>> Cheers, >>> Luke Maycock >>> OLIVER WYMAN >>> luke.mayc...@affiliate.oliverwyman.com<mailto:luke. >>> mayc...@affiliate.oliverwyman.com> >>> www.oliverwyman.com<http://www.oliverwyman.com/> >>> >>> >>> >>> ________________________________ >>> From: Gerard Toonstra <gtoons...@gmail.com> >>> Sent: 09 November 2016 18:08 >>> To: dev@airflow.incubator.apache.org >>> Subject: Re: Skip task >>> >>> Hey Luke, >>> >>> Who or what makes the decision to skip processing that task? >>> >>> Rgds, >>> >>> Gerard >>> >>> On Wed, Nov 9, 2016 at 2:39 PM, Maycock, Luke < >>> luke.mayc...@affiliate.oliverwyman.com> wrote: >>> >>>> Hi Gerard, >>>> >>>> >>>> Thank you for your quick response. >>>> >>>> >>>> I am not trying to implement this for a specific operator but rather >>>> trying to add it as a feature for any task in any DAG. >>>> >>>> >>>> Given that the skipped states propagate where all directly upstream >> tasks >>>> are skipped, I don't think this is the state we want to use. For the >>>> functionality I'm looking for, I think I'll need to introduce a new >>> status, >>>> maybe 'disabled'. >>>> >>>> >>>> Again, thanks for your response. >>>> >>>> >>>> Cheers, >>>> Luke Maycock >>>> OLIVER WYMAN >>>> luke.mayc...@affiliate.oliverwyman.com<mailto:luke. >>>> mayc...@affiliate.oliverwyman.com> >>>> www.oliverwyman.com<http://www.oliverwyman.com/> >>>> >>>> >>>> >>>> ________________________________ >>>> From: Gerard Toonstra <gtoons...@gmail.com> >>>> Sent: 08 November 2016 18:19 >>>> To: dev@airflow.incubator.apache.org >>>> Subject: Re: Skip task >>>> >>>> Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you >> an >>>> example. >>>> >>>> https://github.com/apache/incubator-airflow/blob/1.7.1. >>>> 3/airflow/operators/python_operator.py >>>> >>>> You'd have to modify this to your needs, but the way it works is that >> if >>>> the condition evaluates to True, none of the >>>> downstream tasks are actually executed, they'd be skipped. The reason >> for >>>> putting them into SKIPPED state is that >>>> the DAG final result would still be SUCCESS and not failed. >>>> >>>> You could copy the operator from there and don't do the full "for >> loop", >>>> only pick the tasks immediately downstream >>>> from this operator and skip that. Or... if you need to skip additional >>>> tasks downstream, add a parameter "num_tasks" >>>> that decide on a halting condition for the for loop. >>>> >>>> I believe that should work. I didn't try that here, but you can test >> that >>>> and see what it does for you. >>>> >>>> >>>> If you want this as a UI capability... for example have a human >> operator >>>> decide on skipping this yes or not, then >>>> maybe the best way forward would be some kind of highly custom plugin >>> with >>>> its own view. In the end, you'd basically >>>> do the same action in the backend, whether the python cond evaluates to >>>> True or the button is clicked. >>>> >>>> In the plugin case though, you'd have to keep the UI and the structure >> of >>>> the DAG in sync and aligned, otherwise >>>> it'd become a mess.... Airflow wasn't really developed for >> workflow/human >>>> interaction, but in workflows where only >>>> automated processes are involved. That doesn't mean that you can't do >>>> anything like that, but it may be costly resource >>>> wise to get this done. For example, on the basis of the BranchOperator, >>> you >>>> could call an external API to verify if a decision >>>> was taken on a case, then follow branch A or B if the decision is there >>> or >>>> put the state back into UP_FOR_RETRY. >>>> At the moment though, there's no programmatic way to reschedule that >> task >>>> to some minutes or hours into the future before >>>> it's looked at again, unless you really dive into airflow, scheduling >>>> semantics (@once vs. other schedules) and how >>>> the scheduler works. >>>> >>>> Rgds, >>>> >>>> Gerard >>>> >>>> >>>> >>>> >>>> On Tue, Nov 8, 2016 at 5:30 PM, Maycock, Luke < >>>> luke.mayc...@affiliate.oliverwyman.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> >>>>> I am using Airflow 1.7.1.3 and have a particular requirement, which I >>>>> don't think is currently supported by Airflow but just wanted to >> check >>> in >>>>> case I was missing something. >>>>> >>>>> >>>>> I occasionally wish to skip a particular task in a given DAG run such >>>> that >>>>> the task does not run for that DAG run. Is this functionality >> available >>>> in >>>>> Airflow? >>>>> >>>>> >>>>> I am aware of the BranchPythonOperator (https://airflow.incubator. >>>>> apache.org/concepts.html#branching) but I don't think believe this >> is >>>>> exactly what I am looking for. >>>>> >>>>> >>>>> I am thinking that a button in the UI alongside the 'Mark Success' >> and >>>>> 'Run' buttons would be appropriate. >>>>> >>>>> >>>>> If the functionality does not exist, does anyone have any suggestions >>> on >>>>> ways to implement this? >>>>> >>>>> >>>>> Cheers, >>>>> Luke Maycock >>>>> OLIVER WYMAN >>>>> luke.mayc...@affiliate.oliverwyman.com<mailto:luke. >>>>> mayc...@affiliate.oliverwyman.com> >>>>> www.oliverwyman.com<http://www.oliverwyman.com/> >>>>> >>>>> >>>>> ________________________________ >>>>> This e-mail and any attachments may be confidential or legally >>>> privileged. >>>>> If you received this message in error or are not the intended >>> recipient, >>>>> you should destroy the e-mail message and any attachments or copies, >>> and >>>>> you are prohibited from retaining, distributing, disclosing or using >>> any >>>>> information contained herein. Please inform us of the erroneous >>> delivery >>>> by >>>>> return e-mail. Thank you for your cooperation. >>>>> >>>> >>>> ________________________________ >>>> This e-mail and any attachments may be confidential or legally >>> privileged. >>>> If you received this message in error or are not the intended >> recipient, >>>> you should destroy the e-mail message and any attachments or copies, >> and >>>> you are prohibited from retaining, distributing, disclosing or using >> any >>>> information contained herein. Please inform us of the erroneous >> delivery >>> by >>>> return e-mail. Thank you for your cooperation. >>>> >>> >>> ________________________________ >>> This e-mail and any attachments may be confidential or legally >> privileged. >>> If you received this message in error or are not the intended recipient, >>> you should destroy the e-mail message and any attachments or copies, and >>> you are prohibited from retaining, distributing, disclosing or using any >>> information contained herein. Please inform us of the erroneous delivery >> by >>> return e-mail. Thank you for your cooperation. >>> >> >> ________________________________ >> This e-mail and any attachments may be confidential or legally privileged. >> If you received this message in error or are not the intended recipient, >> you should destroy the e-mail message and any attachments or copies, and >> you are prohibited from retaining, distributing, disclosing or using any >> information contained herein. Please inform us of the erroneous delivery by >> return e-mail. Thank you for your cooperation. >> > > ________________________________ > This e-mail and any attachments may be confidential or legally privileged. If > you received this message in error or are not the intended recipient, you > should destroy the e-mail message and any attachments or copies, and you are > prohibited from retaining, distributing, disclosing or using any information > contained herein. Please inform us of the erroneous delivery by return > e-mail. Thank you for your cooperation. > > ________________________________ > This e-mail and any attachments may be confidential or legally privileged. If > you received this message in error or are not the intended recipient, you > should destroy the e-mail message and any attachments or copies, and you are > prohibited from retaining, distributing, disclosing or using any information > contained herein. Please inform us of the erroneous delivery by return > e-mail. Thank you for your cooperation. ________________________________ This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation.