I found the issue to be that, for MySQL, the datetime was being rounded to the nearest second. The strange thing is that if a datetime without the microseconds was passed to SQLAlchemy, the insertion into MySQL failed; but when a datetime with microseconds was passed, the microseconds are removed by rounding to the nearest second.
Hopefully, this will prevent someone else going down the same rabbit hole that I did. Cheers, Luke Maycock OLIVER WYMAN [email protected]<mailto:[email protected]> www.oliverwyman.com<http://www.oliverwyman.com/> ________________________________ From: Maycock, Luke <[email protected]> Sent: 08 December 2016 10:44:32 To: [email protected] Subject: Re: Skip task Hi All, We have implemented a solution for allowing the exclusion of individual tasks during a DAG run. However, when writing unit tests for this, we are encountering an issue with MySQL, which I am hoping someone is able to help us with. For our solution, we have a new 'TaskExclusion' table in the meta-data. Our unit tests were run by Travis, not locally. The code block under test: class TaskExclusion(Base): """ This class is used to define objects that can be used to specify not to run a given task in a given dag on a variety of execution date conditions. These objects will be stored in the backend database in the task_exclusion table. Static methods are provided for the creation, removal and investigation of these objects. """ __tablename__ = "task_exclusion" id = Column(Integer(), primary_key=True) dag_id = Column(String(ID_LEN), nullable=False) task_id = Column(String(ID_LEN), nullable=False) exclusion_type = Column(String(32), nullable=False) exclusion_start_date = Column(DateTime, nullable=True) exclusion_end_date = Column(DateTime, nullable=True) created_by = Column(String(256), nullable=False) created_on = Column(DateTime, nullable=False) @classmethod @provide_session def set( cls, dag_id, task_id, exclusion_type, exclusion_start_date, exclusion_end_date, created_by, session=None): """ Add a task exclusion to prevent a task running under certain circumstances. :param dag_id: The dag_id of the DAG containing the task to exclude from execution. :param task_id: The task_id of the task to exclude from execution. :param exclusion_type: The type of circumstances to exclude the task from execution under. See the TaskExclusionType class for more detail. :param exclusion_start_date: The execution_date to start excluding on. This will be ignored if the exclusion_type is INDEFINITE. :param exclusion_end_date: The execution_date to stop excluding on. This will be ignored if the exclusion_type is INDEFINITE or SINGLE_DATE. :param created_by: Who is creating this exclusion. Stored with the exclusion record for auditing/debugging purposes. :return: None. """ session.expunge_all() # Set up execution date range correctly if exclusion_type == TaskExclusionType.SINGLE_DATE: if exclusion_start_date: exclusion_end_date = exclusion_start_date else: raise AirflowException( "No exclusion_start_date " ) elif exclusion_type == TaskExclusionType.DATE_RANGE: if exclusion_start_date > exclusion_end_date: raise AirflowException( "The exclusion_start_date is after the exclusion_end_date" ) elif exclusion_type == TaskExclusionType.INDEFINITE: exclusion_start_date = None exclusion_end_date = None else: raise AirflowException( "The exclusion_type, {}, is not recognised." .format(exclusion_type) ) # remove any duplicate exclusions session.query(cls).filter( cls.dag_id == dag_id, cls.task_id == task_id, cls.exclusion_type == exclusion_type, cls.exclusion_start_date == exclusion_start_date, cls.exclusion_end_date == exclusion_end_date ).delete() # insert new exclusion session.add(TaskExclusion( dag_id=dag_id, task_id=task_id, exclusion_type=exclusion_type, exclusion_start_date=exclusion_start_date, exclusion_end_date=exclusion_end_date, created_by=created_by, created_on=datetime.now()) ) session.commit() The unit test: class TaskExclusionTest(unittest.TestCase): def test_set_exclusion(self, session=None): session = settings.Session() session.expunge_all() dag_id = 'test_task_exclude' task_id = 'test_task_exclude' exec_date = datetime.datetime.now() TaskExclusion.set(dag_id=dag_id, task_id=task_id, exclusion_type=TaskExclusionType.SINGLE_DATE, exclusion_start_date=exec_date, exclusion_end_date=exec_date, created_by='airflow') exclusion = session.query(TaskExclusion).filter( TaskExclusion.dag_id == dag_id, TaskExclusion.task_id == task_id, TaskExclusion.exclusion_type == TaskExclusionType.SINGLE_DATE, TaskExclusion.exclusion_start_date == exec_date, TaskExclusion.exclusion_end_date == exec_date).first() self.assertTrue(exclusion) The unit test passes for postgreSQL and SQLite but fails for MySQL. I have checked and the 'exclusion' variable contains a TaskExclusion object for postgreSQL and SQLite but is set to 'None' for MySQL. Any suggestions on what could be causing this would be much appreciated. Cheers, Luke Maycock OLIVER WYMAN [email protected]<mailto:[email protected]> www.oliverwyman.com<http://www.oliverwyman.com/> ________________________________ From: siddharth anand <[email protected]> Sent: 16 November 2016 00:40 To: [email protected] Subject: Re: Skip task If your requirement is to skip a portion of tasks in a DagRun based on some state encountered while executing that DagRun, that is what BranchPythonOperator or ShortCircruitOperator (optionally paired with a Trigger Rule specified on a downstream task) is made for. These operators take a custom Python callable as a argument. The callable can check for the existence of data or files that should have been generated by an external system or an upstream task in the same DAG. The callables need to return a Boolean value in the case of the ShortCircruitOperator or a selected choice (i.e. branch to take) as in the case of the BranchPythonOperator. If you have 20 tasks that all depend on the presence of 20 different files, you would need 20 ShortCircruitOperator or BranchPythonOperator tasks each either sharing a common callable or each with its own callable. One could argue that these tasks are "overhead" because they just encompass some conditional or control logic and that DAGs should only contain workhorse tasks (i.e. tasks that do some work). DAGs with workhorse-only tasks are more of a pure dataflow approach -- i.e. no control-logic operators. However, I don't see another option. In the current system, a callable registered with a ShortCircruitOperator would check for the presence of a file -- if the file were not available, then a series of downstream tasks would be skipped in that DAGRun, until a task with a Trigger_Rule="all_done" were encountered, downstream of which, tasks would no longer be skipped for the DagRun. I hope this makes sense. A long time ago, I proposed UI functionality to skip a series of DAG runs via the UI, because I knew that no data was available for that time range from an external system. It wanted to essentially specify a "blackout" period in terms of a time range that covered multiple DagRuns. My intention was for backfills to skip those days. It turns out that my company did not end up having such a requirement, so I dropped the feature request. If this is what you are asking for, then I am +1. Please implement it and submit a PR. On Tue, Nov 15, 2016 at 2:50 AM, Maycock, Luke < [email protected]> wrote: > Thank you for taking the time to respond. This is a great approach if you > know at the time of creating the DAG which tasks you expect to need to > skip. However, I don't think this is exactly the use case I have. For > example, I may be expecting a file to arrive in an FTP folder for loading > into a database but one day it doesn't arrive so I just want to skip that > task on that day. > > > Our workflows commonly have around 20 of these types of tasks in. I could > configure all of these tasks in the way you suggested in case I ever need > to skip one of them. However, I'd prefer not to have to set the tasks up > this way and instead have the ability just to skip a task on an ad-hoc > basis. I could then also use this functionality to add the ability to run > from a certain point in a DAG or to a certain point in the DAG. > > > > Thanks, > Luke Maycock > OLIVER WYMAN > [email protected]<mailto:luke. > [email protected]> > www.oliverwyman.com<http://www.oliverwyman.com/> > > > > ________________________________ > From: siddharth anand <[email protected]> > Sent: 14 November 2016 19:48 > To: [email protected] > Subject: Re: Skip task > > For cases like this, we (Agari) use the following approach : > > 1. Create a Variable in the UI of type boolean such as *enable_feature_x* > 2. Use a ShortCircuitOperator (or BranchPythonOperator) to Skip > downstream processing based on the value of *enable_feature_x* > 3. Assuming that you don't want to skip ALL downstream tasks, you can > use a trigger_rule of all_done to resume processing some portion of your > downstream DAG after skipping an upstream portion > > In other words, there is already a means to achieve what you are asking for > today. You can change the value of via *enable_feature_x *the UI. If you'd > like to enhance the UI to better capture this pattern, pls submit a PR. > -s > > On Thu, Nov 10, 2016 at 1:20 PM, Maycock, Luke < > [email protected]> wrote: > > > Hi Gerard, > > > > > > I see the new status as having a number of uses: > > > > 1. A user can manually set a task to skip in a DAG run via the UI. > > 2. We can then make use of this new status to add the following > > functionality to Airflow: > > * Run a DAG run up to a certain point and have the rest of the > tasks > > have the new status. > > * Run a DAG run from a certain task to the end, setting all > > pre-requisite tasks to have this new status. > > > > I am happy to be challenged on the above use cases if there are better > > ways to achieve the same things. > > > > Cheers, > > Luke Maycock > > OLIVER WYMAN > > [email protected]<mailto:luke. > > [email protected]> > > www.oliverwyman.com<http://www.oliverwyman.com/> > > > > > > > > ________________________________ > > From: Gerard Toonstra <[email protected]> > > Sent: 09 November 2016 18:08 > > To: [email protected] > > Subject: Re: Skip task > > > > Hey Luke, > > > > Who or what makes the decision to skip processing that task? > > > > Rgds, > > > > Gerard > > > > On Wed, Nov 9, 2016 at 2:39 PM, Maycock, Luke < > > [email protected]> wrote: > > > > > Hi Gerard, > > > > > > > > > Thank you for your quick response. > > > > > > > > > I am not trying to implement this for a specific operator but rather > > > trying to add it as a feature for any task in any DAG. > > > > > > > > > Given that the skipped states propagate where all directly upstream > tasks > > > are skipped, I don't think this is the state we want to use. For the > > > functionality I'm looking for, I think I'll need to introduce a new > > status, > > > maybe 'disabled'. > > > > > > > > > Again, thanks for your response. > > > > > > > > > Cheers, > > > Luke Maycock > > > OLIVER WYMAN > > > [email protected]<mailto:luke. > > > [email protected]> > > > www.oliverwyman.com<http://www.oliverwyman.com/> > > > > > > > > > > > > ________________________________ > > > From: Gerard Toonstra <[email protected]> > > > Sent: 08 November 2016 18:19 > > > To: [email protected] > > > Subject: Re: Skip task > > > > > > Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you > an > > > example. > > > > > > https://github.com/apache/incubator-airflow/blob/1.7.1. > > > 3/airflow/operators/python_operator.py > > > > > > You'd have to modify this to your needs, but the way it works is that > if > > > the condition evaluates to True, none of the > > > downstream tasks are actually executed, they'd be skipped. The reason > for > > > putting them into SKIPPED state is that > > > the DAG final result would still be SUCCESS and not failed. > > > > > > You could copy the operator from there and don't do the full "for > loop", > > > only pick the tasks immediately downstream > > > from this operator and skip that. Or... if you need to skip additional > > > tasks downstream, add a parameter "num_tasks" > > > that decide on a halting condition for the for loop. > > > > > > I believe that should work. I didn't try that here, but you can test > that > > > and see what it does for you. > > > > > > > > > If you want this as a UI capability... for example have a human > operator > > > decide on skipping this yes or not, then > > > maybe the best way forward would be some kind of highly custom plugin > > with > > > its own view. In the end, you'd basically > > > do the same action in the backend, whether the python cond evaluates to > > > True or the button is clicked. > > > > > > In the plugin case though, you'd have to keep the UI and the structure > of > > > the DAG in sync and aligned, otherwise > > > it'd become a mess.... Airflow wasn't really developed for > workflow/human > > > interaction, but in workflows where only > > > automated processes are involved. That doesn't mean that you can't do > > > anything like that, but it may be costly resource > > > wise to get this done. For example, on the basis of the BranchOperator, > > you > > > could call an external API to verify if a decision > > > was taken on a case, then follow branch A or B if the decision is there > > or > > > put the state back into UP_FOR_RETRY. > > > At the moment though, there's no programmatic way to reschedule that > task > > > to some minutes or hours into the future before > > > it's looked at again, unless you really dive into airflow, scheduling > > > semantics (@once vs. other schedules) and how > > > the scheduler works. > > > > > > Rgds, > > > > > > Gerard > > > > > > > > > > > > > > > On Tue, Nov 8, 2016 at 5:30 PM, Maycock, Luke < > > > [email protected]> wrote: > > > > > > > Hi All, > > > > > > > > > > > > I am using Airflow 1.7.1.3 and have a particular requirement, which I > > > > don't think is currently supported by Airflow but just wanted to > check > > in > > > > case I was missing something. > > > > > > > > > > > > I occasionally wish to skip a particular task in a given DAG run such > > > that > > > > the task does not run for that DAG run. Is this functionality > available > > > in > > > > Airflow? > > > > > > > > > > > > I am aware of the BranchPythonOperator (https://airflow.incubator. > > > > apache.org/concepts.html#branching) but I don't think believe this > is > > > > exactly what I am looking for. > > > > > > > > > > > > I am thinking that a button in the UI alongside the 'Mark Success' > and > > > > 'Run' buttons would be appropriate. > > > > > > > > > > > > If the functionality does not exist, does anyone have any suggestions > > on > > > > ways to implement this? > > > > > > > > > > > > Cheers, > > > > Luke Maycock > > > > OLIVER WYMAN > > > > [email protected]<mailto:luke. > > > > [email protected]> > > > > www.oliverwyman.com<http://www.oliverwyman.com/> > > > > > > > > > > > > ________________________________ > > > > This e-mail and any attachments may be confidential or legally > > > privileged. > > > > If you received this message in error or are not the intended > > recipient, > > > > you should destroy the e-mail message and any attachments or copies, > > and > > > > you are prohibited from retaining, distributing, disclosing or using > > any > > > > information contained herein. Please inform us of the erroneous > > delivery > > > by > > > > return e-mail. Thank you for your cooperation. > > > > > > > > > > ________________________________ > > > This e-mail and any attachments may be confidential or legally > > privileged. > > > If you received this message in error or are not the intended > recipient, > > > you should destroy the e-mail message and any attachments or copies, > and > > > you are prohibited from retaining, distributing, disclosing or using > any > > > information contained herein. Please inform us of the erroneous > delivery > > by > > > return e-mail. Thank you for your cooperation. > > > > > > > ________________________________ > > This e-mail and any attachments may be confidential or legally > privileged. > > If you received this message in error or are not the intended recipient, > > you should destroy the e-mail message and any attachments or copies, and > > you are prohibited from retaining, distributing, disclosing or using any > > information contained herein. Please inform us of the erroneous delivery > by > > return e-mail. Thank you for your cooperation. > > > > ________________________________ > This e-mail and any attachments may be confidential or legally privileged. > If you received this message in error or are not the intended recipient, > you should destroy the e-mail message and any attachments or copies, and > you are prohibited from retaining, distributing, disclosing or using any > information contained herein. Please inform us of the erroneous delivery by > return e-mail. Thank you for your cooperation. > ________________________________ This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation. ________________________________ This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation.
