Have a look at: https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py <https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py>
Make sure to include "type_=mysql.DATETIME(fsp=6)” for your DateTime types on MySQL. - Bolke > Op 12 dec. 2016, om 11:33 heeft Maycock, Luke > <[email protected]> het volgende geschreven: > > It is a new table named 'TaskExclusion'. The migration script for this is as > follows: > > def upgrade(): > op.create_table( > 'task_exclusion', > sa.Column('id', sa.Integer(), nullable=False), > sa.Column('dag_id', sa.String(length=250), nullable=False), > sa.Column('task_id', sa.String(length=250), nullable=False), > sa.Column('exclusion_type', sa.String(length=32), nullable=False), > sa.Column('exclusion_start_date', sa.DateTime(), nullable=False), > sa.Column('exclusion_end_date', sa.DateTime(), nullable=False), > sa.Column('created_by', sa.String(length=256), nullable=False), > sa.Column('created_on', sa.DateTime(), nullable=False), > sa.PrimaryKeyConstraint('id')) > > > def downgrade(): > op.drop_table('task_exclusion') > > This is the PR for the exclusion of a task. We review our code internally > before setting up a PR into the main repo for the next review, hence the PR > being in our fork. The PR does not yet contain our unit tests. > > > Cheers, > Luke Maycock > OLIVER WYMAN > [email protected]<mailto:[email protected]> > www.oliverwyman.com<http://www.oliverwyman.com/> > > > > ________________________________ > From: Bolke de Bruin <[email protected]> > Sent: 09 December 2016 20:54 > To: [email protected] > Subject: Re: Skip task > > What table was this? I recently pushed a fix that allows fractional seconds > in our minimum supported version of MySQL (5.6.4 and beyond). > > I might have missed something. > > Thanks > Bolke > > Sent from my iPhone > >> On 9 Dec 2016, at 14:27, Maycock, Luke >> <[email protected]> wrote: >> >> I found the issue to be that, for MySQL, the datetime was being rounded to >> the nearest second. The strange thing is that if a datetime without the >> microseconds was passed to SQLAlchemy, the insertion into MySQL failed; but >> when a datetime with microseconds was passed, the microseconds are removed >> by rounding to the nearest second. >> >> >> Hopefully, this will prevent someone else going down the same rabbit hole >> that I did. >> >> >> Cheers, >> Luke Maycock >> OLIVER WYMAN >> [email protected]<mailto:[email protected]> >> www.oliverwyman.com<http://www.oliverwyman.com/> >> >> >> ________________________________ >> From: Maycock, Luke <[email protected]> >> Sent: 08 December 2016 10:44:32 >> To: [email protected] >> Subject: Re: Skip task >> >> Hi All, >> >> >> We have implemented a solution for allowing the exclusion of individual >> tasks during a DAG run. However, when writing unit tests for this, we are >> encountering an issue with MySQL, which I am hoping someone is able to help >> us with. >> >> >> For our solution, we have a new 'TaskExclusion' table in the meta-data. Our >> unit tests were run by Travis, not locally. >> >> >> The code block under test: >> >> >> class TaskExclusion(Base): >> """ >> This class is used to define objects that can be used to specify not to >> run a given task in a given dag on a variety of execution date conditions. >> These objects will be stored in the backend database in the task_exclusion >> table. >> Static methods are provided for the creation, removal and investigation of >> these objects. >> """ >> >> __tablename__ = "task_exclusion" >> >> id = Column(Integer(), primary_key=True) >> dag_id = Column(String(ID_LEN), nullable=False) >> task_id = Column(String(ID_LEN), nullable=False) >> exclusion_type = Column(String(32), nullable=False) >> exclusion_start_date = Column(DateTime, nullable=True) >> exclusion_end_date = Column(DateTime, nullable=True) >> created_by = Column(String(256), nullable=False) >> created_on = Column(DateTime, nullable=False) >> >> @classmethod >> @provide_session >> def set( >> cls, >> dag_id, >> task_id, >> exclusion_type, >> exclusion_start_date, >> exclusion_end_date, >> created_by, >> session=None): >> """ >> Add a task exclusion to prevent a task running under certain >> circumstances. >> :param dag_id: The dag_id of the DAG containing the task to exclude >> from execution. >> :param task_id: The task_id of the task to exclude from execution. >> :param exclusion_type: The type of circumstances to exclude the task >> from execution under. See the TaskExclusionType class for more detail. >> :param exclusion_start_date: The execution_date to start excluding on. >> This will be ignored if the exclusion_type is INDEFINITE. >> :param exclusion_end_date: The execution_date to stop excluding on. >> This will be ignored if the exclusion_type is INDEFINITE or >> SINGLE_DATE. >> :param created_by: Who is creating this exclusion. Stored with the >> exclusion record for auditing/debugging purposes. >> :return: None. >> """ >> >> session.expunge_all() >> >> # Set up execution date range correctly >> if exclusion_type == TaskExclusionType.SINGLE_DATE: >> if exclusion_start_date: >> exclusion_end_date = exclusion_start_date >> else: >> raise AirflowException( >> "No exclusion_start_date " >> ) >> elif exclusion_type == TaskExclusionType.DATE_RANGE: >> if exclusion_start_date > exclusion_end_date: >> raise AirflowException( >> "The exclusion_start_date is after the exclusion_end_date" >> ) >> elif exclusion_type == TaskExclusionType.INDEFINITE: >> exclusion_start_date = None >> exclusion_end_date = None >> else: >> raise AirflowException( >> "The exclusion_type, {}, is not recognised." >> .format(exclusion_type) >> ) >> >> # remove any duplicate exclusions >> session.query(cls).filter( >> cls.dag_id == dag_id, >> cls.task_id == task_id, >> cls.exclusion_type == exclusion_type, >> cls.exclusion_start_date == exclusion_start_date, >> cls.exclusion_end_date == exclusion_end_date >> ).delete() >> >> # insert new exclusion >> session.add(TaskExclusion( >> dag_id=dag_id, >> task_id=task_id, >> exclusion_type=exclusion_type, >> exclusion_start_date=exclusion_start_date, >> exclusion_end_date=exclusion_end_date, >> created_by=created_by, >> created_on=datetime.now()) >> ) >> >> session.commit() >> >> >> The unit test: >> >> class TaskExclusionTest(unittest.TestCase): >> def test_set_exclusion(self, session=None): >> >> session = settings.Session() >> >> session.expunge_all() >> >> dag_id = 'test_task_exclude' >> task_id = 'test_task_exclude' >> exec_date = datetime.datetime.now() >> >> TaskExclusion.set(dag_id=dag_id, >> task_id=task_id, >> exclusion_type=TaskExclusionType.SINGLE_DATE, >> exclusion_start_date=exec_date, >> exclusion_end_date=exec_date, >> created_by='airflow') >> >> >> exclusion = session.query(TaskExclusion).filter( >> TaskExclusion.dag_id == dag_id, >> TaskExclusion.task_id == task_id, >> TaskExclusion.exclusion_type == >> TaskExclusionType.SINGLE_DATE, >> TaskExclusion.exclusion_start_date == exec_date, >> TaskExclusion.exclusion_end_date == exec_date).first() >> >> self.assertTrue(exclusion) >> >> >> The unit test passes for postgreSQL and SQLite but fails for MySQL. I have >> checked and the 'exclusion' variable contains a TaskExclusion object for >> postgreSQL and SQLite but is set to 'None' for MySQL. Any suggestions on >> what could be causing this would be much appreciated. >> >> >> Cheers, >> Luke Maycock >> OLIVER WYMAN >> [email protected]<mailto:[email protected]> >> www.oliverwyman.com<http://www.oliverwyman.com/> >> >> >> >> ________________________________ >> From: siddharth anand <[email protected]> >> Sent: 16 November 2016 00:40 >> To: [email protected] >> Subject: Re: Skip task >> >> If your requirement is to skip a portion of tasks in a DagRun based on some >> state encountered while executing that DagRun, that is what >> BranchPythonOperator or ShortCircruitOperator (optionally paired with a >> Trigger Rule specified on a downstream task) is made for. >> >> These operators take a custom Python callable as a argument. The callable >> can check for the existence of data or files that should have been >> generated by an external system or an upstream task in the same DAG. The >> callables need to return a Boolean value in the case of the >> ShortCircruitOperator or a selected choice (i.e. branch to take) as in the >> case of the BranchPythonOperator. >> >> If you have 20 tasks that all depend on the presence of 20 different files, >> you would need 20 ShortCircruitOperator or BranchPythonOperator tasks each >> either sharing a common callable or each with its own callable. >> >> One could argue that these tasks are "overhead" because they just encompass >> some conditional or control logic and that DAGs should only contain >> workhorse tasks (i.e. tasks that do some work). DAGs with workhorse-only >> tasks are more of a pure dataflow approach -- i.e. no control-logic >> operators. However, I don't see another option. >> >> In the current system, a callable registered with a ShortCircruitOperator >> would check for the presence of a file -- if the file were not available, >> then a series of downstream tasks would be skipped in that DAGRun, until a >> task with a Trigger_Rule="all_done" were encountered, downstream of which, >> tasks would no longer be skipped for the DagRun. >> >> I hope this makes sense. >> >> A long time ago, I proposed UI functionality to skip a series of DAG runs >> via the UI, because I knew that no data was available for that time range >> from an external system. It wanted to essentially specify a "blackout" >> period in terms of a time range that covered multiple DagRuns. My intention >> was for backfills to skip those days. It turns out that my company did not >> end up having such a requirement, so I dropped the feature request. >> >> If this is what you are asking for, then I am +1. Please implement it and >> submit a PR. >> >> On Tue, Nov 15, 2016 at 2:50 AM, Maycock, Luke < >> [email protected]> wrote: >> >>> Thank you for taking the time to respond. This is a great approach if you >>> know at the time of creating the DAG which tasks you expect to need to >>> skip. However, I don't think this is exactly the use case I have. For >>> example, I may be expecting a file to arrive in an FTP folder for loading >>> into a database but one day it doesn't arrive so I just want to skip that >>> task on that day. >>> >>> >>> Our workflows commonly have around 20 of these types of tasks in. I could >>> configure all of these tasks in the way you suggested in case I ever need >>> to skip one of them. However, I'd prefer not to have to set the tasks up >>> this way and instead have the ability just to skip a task on an ad-hoc >>> basis. I could then also use this functionality to add the ability to run >>> from a certain point in a DAG or to a certain point in the DAG. >>> >>> >>> >>> Thanks, >>> Luke Maycock >>> OLIVER WYMAN >>> [email protected]<mailto:luke. >>> [email protected]> >>> www.oliverwyman.com<http://www.oliverwyman.com/> >>> >>> >>> >>> ________________________________ >>> From: siddharth anand <[email protected]> >>> Sent: 14 November 2016 19:48 >>> To: [email protected] >>> Subject: Re: Skip task >>> >>> For cases like this, we (Agari) use the following approach : >>> >>> 1. Create a Variable in the UI of type boolean such as *enable_feature_x* >>> 2. Use a ShortCircuitOperator (or BranchPythonOperator) to Skip >>> downstream processing based on the value of *enable_feature_x* >>> 3. Assuming that you don't want to skip ALL downstream tasks, you can >>> use a trigger_rule of all_done to resume processing some portion of your >>> downstream DAG after skipping an upstream portion >>> >>> In other words, there is already a means to achieve what you are asking for >>> today. You can change the value of via *enable_feature_x *the UI. If you'd >>> like to enhance the UI to better capture this pattern, pls submit a PR. >>> -s >>> >>> On Thu, Nov 10, 2016 at 1:20 PM, Maycock, Luke < >>> [email protected]> wrote: >>> >>>> Hi Gerard, >>>> >>>> >>>> I see the new status as having a number of uses: >>>> >>>> 1. A user can manually set a task to skip in a DAG run via the UI. >>>> 2. We can then make use of this new status to add the following >>>> functionality to Airflow: >>>> * Run a DAG run up to a certain point and have the rest of the >>> tasks >>>> have the new status. >>>> * Run a DAG run from a certain task to the end, setting all >>>> pre-requisite tasks to have this new status. >>>> >>>> I am happy to be challenged on the above use cases if there are better >>>> ways to achieve the same things. >>>> >>>> Cheers, >>>> Luke Maycock >>>> OLIVER WYMAN >>>> [email protected]<mailto:luke. >>>> [email protected]> >>>> www.oliverwyman.com<http://www.oliverwyman.com/> >>>> >>>> >>>> >>>> ________________________________ >>>> From: Gerard Toonstra <[email protected]> >>>> Sent: 09 November 2016 18:08 >>>> To: [email protected] >>>> Subject: Re: Skip task >>>> >>>> Hey Luke, >>>> >>>> Who or what makes the decision to skip processing that task? >>>> >>>> Rgds, >>>> >>>> Gerard >>>> >>>> On Wed, Nov 9, 2016 at 2:39 PM, Maycock, Luke < >>>> [email protected]> wrote: >>>> >>>>> Hi Gerard, >>>>> >>>>> >>>>> Thank you for your quick response. >>>>> >>>>> >>>>> I am not trying to implement this for a specific operator but rather >>>>> trying to add it as a feature for any task in any DAG. >>>>> >>>>> >>>>> Given that the skipped states propagate where all directly upstream >>> tasks >>>>> are skipped, I don't think this is the state we want to use. For the >>>>> functionality I'm looking for, I think I'll need to introduce a new >>>> status, >>>>> maybe 'disabled'. >>>>> >>>>> >>>>> Again, thanks for your response. >>>>> >>>>> >>>>> Cheers, >>>>> Luke Maycock >>>>> OLIVER WYMAN >>>>> [email protected]<mailto:luke. >>>>> [email protected]> >>>>> www.oliverwyman.com<http://www.oliverwyman.com/> >>>>> >>>>> >>>>> >>>>> ________________________________ >>>>> From: Gerard Toonstra <[email protected]> >>>>> Sent: 08 November 2016 18:19 >>>>> To: [email protected] >>>>> Subject: Re: Skip task >>>>> >>>>> Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you >>> an >>>>> example. >>>>> >>>>> https://github.com/apache/incubator-airflow/blob/1.7.1. >>>>> 3/airflow/operators/python_operator.py >>>>> >>>>> You'd have to modify this to your needs, but the way it works is that >>> if >>>>> the condition evaluates to True, none of the >>>>> downstream tasks are actually executed, they'd be skipped. The reason >>> for >>>>> putting them into SKIPPED state is that >>>>> the DAG final result would still be SUCCESS and not failed. >>>>> >>>>> You could copy the operator from there and don't do the full "for >>> loop", >>>>> only pick the tasks immediately downstream >>>>> from this operator and skip that. Or... if you need to skip additional >>>>> tasks downstream, add a parameter "num_tasks" >>>>> that decide on a halting condition for the for loop. >>>>> >>>>> I believe that should work. I didn't try that here, but you can test >>> that >>>>> and see what it does for you. >>>>> >>>>> >>>>> If you want this as a UI capability... for example have a human >>> operator >>>>> decide on skipping this yes or not, then >>>>> maybe the best way forward would be some kind of highly custom plugin >>>> with >>>>> its own view. In the end, you'd basically >>>>> do the same action in the backend, whether the python cond evaluates to >>>>> True or the button is clicked. >>>>> >>>>> In the plugin case though, you'd have to keep the UI and the structure >>> of >>>>> the DAG in sync and aligned, otherwise >>>>> it'd become a mess.... Airflow wasn't really developed for >>> workflow/human >>>>> interaction, but in workflows where only >>>>> automated processes are involved. That doesn't mean that you can't do >>>>> anything like that, but it may be costly resource >>>>> wise to get this done. For example, on the basis of the BranchOperator, >>>> you >>>>> could call an external API to verify if a decision >>>>> was taken on a case, then follow branch A or B if the decision is there >>>> or >>>>> put the state back into UP_FOR_RETRY. >>>>> At the moment though, there's no programmatic way to reschedule that >>> task >>>>> to some minutes or hours into the future before >>>>> it's looked at again, unless you really dive into airflow, scheduling >>>>> semantics (@once vs. other schedules) and how >>>>> the scheduler works. >>>>> >>>>> Rgds, >>>>> >>>>> Gerard >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Nov 8, 2016 at 5:30 PM, Maycock, Luke < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> >>>>>> I am using Airflow 1.7.1.3 and have a particular requirement, which I >>>>>> don't think is currently supported by Airflow but just wanted to >>> check >>>> in >>>>>> case I was missing something. >>>>>> >>>>>> >>>>>> I occasionally wish to skip a particular task in a given DAG run such >>>>> that >>>>>> the task does not run for that DAG run. Is this functionality >>> available >>>>> in >>>>>> Airflow? >>>>>> >>>>>> >>>>>> I am aware of the BranchPythonOperator (https://airflow.incubator. >>>>>> apache.org/concepts.html#branching) but I don't think believe this >>> is >>>>>> exactly what I am looking for. >>>>>> >>>>>> >>>>>> I am thinking that a button in the UI alongside the 'Mark Success' >>> and >>>>>> 'Run' buttons would be appropriate. >>>>>> >>>>>> >>>>>> If the functionality does not exist, does anyone have any suggestions >>>> on >>>>>> ways to implement this? >>>>>> >>>>>> >>>>>> Cheers, >>>>>> Luke Maycock >>>>>> OLIVER WYMAN >>>>>> [email protected]<mailto:luke. >>>>>> [email protected]> >>>>>> www.oliverwyman.com<http://www.oliverwyman.com/> >>>>>> >>>>>> >>>>>> ________________________________ >>>>>> This e-mail and any attachments may be confidential or legally >>>>> privileged. >>>>>> If you received this message in error or are not the intended >>>> recipient, >>>>>> you should destroy the e-mail message and any attachments or copies, >>>> and >>>>>> you are prohibited from retaining, distributing, disclosing or using >>>> any >>>>>> information contained herein. Please inform us of the erroneous >>>> delivery >>>>> by >>>>>> return e-mail. Thank you for your cooperation. >>>>>> >>>>> >>>>> ________________________________ >>>>> This e-mail and any attachments may be confidential or legally >>>> privileged. >>>>> If you received this message in error or are not the intended >>> recipient, >>>>> you should destroy the e-mail message and any attachments or copies, >>> and >>>>> you are prohibited from retaining, distributing, disclosing or using >>> any >>>>> information contained herein. Please inform us of the erroneous >>> delivery >>>> by >>>>> return e-mail. Thank you for your cooperation. >>>>> >>>> >>>> ________________________________ >>>> This e-mail and any attachments may be confidential or legally >>> privileged. >>>> If you received this message in error or are not the intended recipient, >>>> you should destroy the e-mail message and any attachments or copies, and >>>> you are prohibited from retaining, distributing, disclosing or using any >>>> information contained herein. Please inform us of the erroneous delivery >>> by >>>> return e-mail. Thank you for your cooperation. >>>> >>> >>> ________________________________ >>> This e-mail and any attachments may be confidential or legally privileged. >>> If you received this message in error or are not the intended recipient, >>> you should destroy the e-mail message and any attachments or copies, and >>> you are prohibited from retaining, distributing, disclosing or using any >>> information contained herein. Please inform us of the erroneous delivery by >>> return e-mail. Thank you for your cooperation. >>> >> >> ________________________________ >> This e-mail and any attachments may be confidential or legally privileged. >> If you received this message in error or are not the intended recipient, you >> should destroy the e-mail message and any attachments or copies, and you are >> prohibited from retaining, distributing, disclosing or using any information >> contained herein. Please inform us of the erroneous delivery by return >> e-mail. Thank you for your cooperation. >> >> ________________________________ >> This e-mail and any attachments may be confidential or legally privileged. >> If you received this message in error or are not the intended recipient, you >> should destroy the e-mail message and any attachments or copies, and you are >> prohibited from retaining, distributing, disclosing or using any information >> contained herein. Please inform us of the erroneous delivery by return >> e-mail. Thank you for your cooperation. > > ________________________________ > This e-mail and any attachments may be confidential or legally privileged. If > you received this message in error or are not the intended recipient, you > should destroy the e-mail message and any attachments or copies, and you are > prohibited from retaining, distributing, disclosing or using any information > contained herein. Please inform us of the erroneous delivery by return > e-mail. Thank you for your cooperation.
