Excellent - thanks for your help Bolke, much appreciated!

Cheers,
Luke Maycock
OLIVER WYMAN
[email protected]<mailto:[email protected]>
www.oliverwyman.com<http://www.oliverwyman.com/>



________________________________
From: Bolke de Bruin <[email protected]>
Sent: 12 December 2016 10:40
To: [email protected]
Subject: Re: Skip task

Have a look at: 
https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
 
<https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py>

Make sure to include "type_=mysql.DATETIME(fsp=6)” for your DateTime types on 
MySQL.

- Bolke



> Op 12 dec. 2016, om 11:33 heeft Maycock, Luke 
> <[email protected]> het volgende geschreven:
>
> It is a new table named 'TaskExclusion'. The migration script for this is as 
> follows:
>
> def upgrade():
>    op.create_table(
>        'task_exclusion',
>        sa.Column('id', sa.Integer(), nullable=False),
>        sa.Column('dag_id', sa.String(length=250), nullable=False),
>        sa.Column('task_id', sa.String(length=250), nullable=False),
>        sa.Column('exclusion_type', sa.String(length=32), nullable=False),
>        sa.Column('exclusion_start_date', sa.DateTime(), nullable=False),
>        sa.Column('exclusion_end_date', sa.DateTime(), nullable=False),
>        sa.Column('created_by', sa.String(length=256), nullable=False),
>        sa.Column('created_on', sa.DateTime(), nullable=False),
>        sa.PrimaryKeyConstraint('id'))
>
>
> def downgrade():
>    op.drop_table('task_exclusion')
>
> This is the PR for the exclusion of a task. We review our code internally 
> before setting up a PR into the main repo for the next review, hence the PR 
> being in our fork. The PR does not yet contain our unit tests.
>
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> [email protected]<mailto:[email protected]>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
>
> ________________________________
> From: Bolke de Bruin <[email protected]>
> Sent: 09 December 2016 20:54
> To: [email protected]
> Subject: Re: Skip task
>
> What table was this? I recently pushed a fix that allows fractional seconds 
> in our minimum supported version of MySQL (5.6.4 and beyond).
>
> I might have missed something.
>
> Thanks
> Bolke
>
> Sent from my iPhone
>
>> On 9 Dec 2016, at 14:27, Maycock, Luke 
>> <[email protected]> wrote:
>>
>> I found the issue to be that, for MySQL, the datetime was being rounded to 
>> the nearest second. The strange thing is that if a datetime without the 
>> microseconds was passed to SQLAlchemy, the insertion into MySQL failed; but 
>> when a datetime with microseconds was passed, the microseconds are removed 
>> by rounding to the nearest second.
>>
>>
>> Hopefully, this will prevent someone else going down the same rabbit hole 
>> that I did.
>>
>>
>> Cheers,
>> Luke Maycock
>> OLIVER WYMAN
>> [email protected]<mailto:[email protected]>
>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>
>>
>> ________________________________
>> From: Maycock, Luke <[email protected]>
>> Sent: 08 December 2016 10:44:32
>> To: [email protected]
>> Subject: Re: Skip task
>>
>> Hi All,
>>
>>
>> We have implemented a solution for allowing the exclusion of individual 
>> tasks during a DAG run. However, when writing unit tests for this, we are 
>> encountering an issue with MySQL, which I am hoping someone is able to help 
>> us with.
>>
>>
>> For our solution, we have a new 'TaskExclusion' table in the meta-data. Our 
>> unit tests were run by Travis, not locally.
>>
>>
>> The code block under test:
>>
>>
>> class TaskExclusion(Base):
>>  """
>> This class is used to define objects that can be used to specify not to
>> run a given task in a given dag on a variety of execution date conditions.
>> These objects will be stored in the backend database in the task_exclusion
>> table.
>> Static methods are provided for the creation, removal and investigation of
>> these objects.
>> """
>>
>> __tablename__ = "task_exclusion"
>>
>> id = Column(Integer(), primary_key=True)
>>  dag_id = Column(String(ID_LEN), nullable=False)
>>  task_id = Column(String(ID_LEN), nullable=False)
>>  exclusion_type = Column(String(32), nullable=False)
>>  exclusion_start_date = Column(DateTime, nullable=True)
>>  exclusion_end_date = Column(DateTime, nullable=True)
>>  created_by = Column(String(256), nullable=False)
>>  created_on = Column(DateTime, nullable=False)
>>
>>  @classmethod
>> @provide_session
>> def set(
>>          cls,
>>          dag_id,
>>          task_id,
>>          exclusion_type,
>>          exclusion_start_date,
>>          exclusion_end_date,
>>          created_by,
>>          session=None):
>>      """
>> Add a task exclusion to prevent a task running under certain
>> circumstances.
>> :param dag_id: The dag_id of the DAG containing the task to exclude
>> from execution.
>> :param task_id: The task_id of the task to exclude from execution.
>> :param exclusion_type: The type of circumstances to exclude the task
>> from execution under. See the TaskExclusionType class for more detail.
>> :param exclusion_start_date: The execution_date to start excluding on.
>> This will be ignored if the exclusion_type is INDEFINITE.
>> :param exclusion_end_date: The execution_date to stop excluding on.
>> This will be ignored if the exclusion_type is INDEFINITE or
>> SINGLE_DATE.
>> :param created_by: Who is creating this exclusion. Stored with the
>> exclusion record for auditing/debugging purposes.
>> :return: None.
>> """
>>
>> session.expunge_all()
>>
>>      # Set up execution date range correctly
>> if exclusion_type == TaskExclusionType.SINGLE_DATE:
>>          if exclusion_start_date:
>>              exclusion_end_date = exclusion_start_date
>>          else:
>>              raise AirflowException(
>>                  "No exclusion_start_date "
>> )
>>      elif exclusion_type == TaskExclusionType.DATE_RANGE:
>>          if exclusion_start_date > exclusion_end_date:
>>              raise AirflowException(
>>                  "The exclusion_start_date is after the exclusion_end_date"
>> )
>>      elif exclusion_type == TaskExclusionType.INDEFINITE:
>>          exclusion_start_date = None
>> exclusion_end_date = None
>> else:
>>          raise AirflowException(
>>             "The exclusion_type, {}, is not recognised."
>> .format(exclusion_type)
>>          )
>>
>>      # remove any duplicate exclusions
>> session.query(cls).filter(
>>          cls.dag_id == dag_id,
>>          cls.task_id == task_id,
>>          cls.exclusion_type == exclusion_type,
>>          cls.exclusion_start_date == exclusion_start_date,
>>          cls.exclusion_end_date == exclusion_end_date
>>      ).delete()
>>
>>      # insert new exclusion
>> session.add(TaskExclusion(
>>          dag_id=dag_id,
>>          task_id=task_id,
>>          exclusion_type=exclusion_type,
>>          exclusion_start_date=exclusion_start_date,
>>          exclusion_end_date=exclusion_end_date,
>>          created_by=created_by,
>>          created_on=datetime.now())
>>      )
>>
>>      session.commit()
>>
>>
>> The unit test:
>>
>> class TaskExclusionTest(unittest.TestCase):
>>  def test_set_exclusion(self, session=None):
>>
>>      session = settings.Session()
>>
>>      session.expunge_all()
>>
>>      dag_id = 'test_task_exclude'
>> task_id = 'test_task_exclude'
>> exec_date = datetime.datetime.now()
>>
>>      TaskExclusion.set(dag_id=dag_id,
>>                        task_id=task_id,
>>                        exclusion_type=TaskExclusionType.SINGLE_DATE,
>>                        exclusion_start_date=exec_date,
>>                        exclusion_end_date=exec_date,
>>                        created_by='airflow')
>>
>>
>>      exclusion = session.query(TaskExclusion).filter(
>>                      TaskExclusion.dag_id == dag_id,
>>                      TaskExclusion.task_id == task_id,
>>                      TaskExclusion.exclusion_type == 
>> TaskExclusionType.SINGLE_DATE,
>>                      TaskExclusion.exclusion_start_date == exec_date,
>>                      TaskExclusion.exclusion_end_date == exec_date).first()
>>
>>      self.assertTrue(exclusion)
>>
>>
>> The unit test passes for postgreSQL and SQLite but fails for MySQL. I have 
>> checked and the 'exclusion' variable contains a TaskExclusion object for 
>> postgreSQL and SQLite but is set to 'None' for MySQL. Any suggestions on 
>> what could be causing this would be much appreciated.
>>
>>
>> Cheers,
>> Luke Maycock
>> OLIVER WYMAN
>> [email protected]<mailto:[email protected]>
>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>
>>
>>
>> ________________________________
>> From: siddharth anand <[email protected]>
>> Sent: 16 November 2016 00:40
>> To: [email protected]
>> Subject: Re: Skip task
>>
>> If your requirement is to skip a portion of tasks in a DagRun based on some
>> state encountered while executing that DagRun, that is what
>> BranchPythonOperator or ShortCircruitOperator (optionally paired with a
>> Trigger Rule specified on a downstream task) is made for.
>>
>> These operators take a custom Python callable as a argument. The callable
>> can check for the existence of data or files that should have been
>> generated by an external system or an upstream task in the same DAG. The
>> callables need to return a Boolean value in the case of the
>> ShortCircruitOperator or a selected choice (i.e. branch to take) as in the
>> case of the BranchPythonOperator.
>>
>> If you have 20 tasks that all depend on the presence of 20 different files,
>> you would need 20 ShortCircruitOperator or BranchPythonOperator tasks each
>> either sharing a common callable or each with its own callable.
>>
>> One could argue that these tasks are "overhead" because they just encompass
>> some conditional or control logic and that DAGs should only contain
>> workhorse tasks (i.e. tasks that do some  work). DAGs with workhorse-only
>> tasks are more of a pure dataflow approach -- i.e. no control-logic
>> operators. However, I don't see another option.
>>
>> In the current system, a callable registered with a ShortCircruitOperator
>> would check for the presence of a file -- if the file were not available,
>> then a series of downstream tasks would be skipped in that DAGRun, until a
>> task with a Trigger_Rule="all_done" were encountered, downstream of which,
>> tasks would no longer be skipped for the DagRun.
>>
>> I hope this makes sense.
>>
>> A long time ago, I proposed UI functionality to skip a series of DAG runs
>> via the UI, because I knew that no data was available for that time range
>> from an external system. It wanted to essentially specify a "blackout"
>> period in terms of a time range that covered multiple DagRuns. My intention
>> was for backfills to skip those days. It turns out that my company did not
>> end up having such a requirement, so I dropped the feature request.
>>
>> If this is what you are asking for, then I am +1. Please implement it and
>> submit a PR.
>>
>> On Tue, Nov 15, 2016 at 2:50 AM, Maycock, Luke <
>> [email protected]> wrote:
>>
>>> Thank you for taking the time to respond. This is a great approach if you
>>> know at the time of creating the DAG which tasks you expect to need to
>>> skip. However, I don't think this is exactly the use case I have. For
>>> example, I may be expecting a file to arrive in an FTP folder for loading
>>> into a database but one day it doesn't arrive so I just want to skip that
>>> task on that day.
>>>
>>>
>>> Our workflows commonly have around 20 of these types of tasks in. I could
>>> configure all of these tasks in the way you suggested in case I ever need
>>> to skip one of them. However, I'd prefer not to have to set the tasks up
>>> this way and instead have the ability just to skip a task on an ad-hoc
>>> basis. I could then also use this functionality to add the ability to run
>>> from a certain point in a DAG or to a certain point in the DAG.
>>>
>>>
>>>
>>> Thanks,
>>> Luke Maycock
>>> OLIVER WYMAN
>>> [email protected]<mailto:luke.
>>> [email protected]>
>>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>>
>>>
>>>
>>> ________________________________
>>> From: siddharth anand <[email protected]>
>>> Sent: 14 November 2016 19:48
>>> To: [email protected]
>>> Subject: Re: Skip task
>>>
>>> For cases like this, we (Agari) use the following approach :
>>>
>>> 1. Create a Variable in the UI of type boolean such as *enable_feature_x*
>>> 2. Use a ShortCircuitOperator (or BranchPythonOperator) to Skip
>>> downstream processing based on the value of *enable_feature_x*
>>> 3. Assuming that you don't want to skip ALL downstream tasks, you can
>>> use a trigger_rule of all_done to resume processing some portion of your
>>> downstream DAG after skipping an upstream portion
>>>
>>> In other words, there is already a means to achieve what you are asking for
>>> today. You can change the value of via *enable_feature_x  *the UI. If you'd
>>> like to enhance the UI to better capture this pattern, pls submit a PR.
>>> -s
>>>
>>> On Thu, Nov 10, 2016 at 1:20 PM, Maycock, Luke <
>>> [email protected]> wrote:
>>>
>>>> Hi Gerard,
>>>>
>>>>
>>>> I see the new status as having a number of uses:
>>>>
>>>> 1.  A user can manually set a task to skip in a DAG run via the UI.
>>>> 2.  We can then make use of this new status to add the following
>>>> functionality to Airflow:
>>>>   *   Run a DAG run up to a certain point and have the rest of the
>>> tasks
>>>> have the new status.
>>>>   *  Run a DAG run from a certain task to the end, setting all
>>>> pre-requisite tasks to have this new status.
>>>>
>>>> I am happy to be challenged on the above use cases if there are better
>>>> ways to achieve the same things.
>>>>
>>>> Cheers,
>>>> Luke Maycock
>>>> OLIVER WYMAN
>>>> [email protected]<mailto:luke.
>>>> [email protected]>
>>>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>>>
>>>>
>>>>
>>>> ________________________________
>>>> From: Gerard Toonstra <[email protected]>
>>>> Sent: 09 November 2016 18:08
>>>> To: [email protected]
>>>> Subject: Re: Skip task
>>>>
>>>> Hey Luke,
>>>>
>>>> Who or what makes the decision to skip processing that task?
>>>>
>>>> Rgds,
>>>>
>>>> Gerard
>>>>
>>>> On Wed, Nov 9, 2016 at 2:39 PM, Maycock, Luke <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Gerard,
>>>>>
>>>>>
>>>>> Thank you for your quick response.
>>>>>
>>>>>
>>>>> I am not trying to implement this for a specific operator but rather
>>>>> trying to add it as a feature for any task in any DAG.
>>>>>
>>>>>
>>>>> Given that the skipped states propagate where all directly upstream
>>> tasks
>>>>> are skipped, I don't think this is the state we want to use. For the
>>>>> functionality I'm looking for, I think I'll need to introduce a new
>>>> status,
>>>>> maybe 'disabled'.
>>>>>
>>>>>
>>>>> Again, thanks for your response.
>>>>>
>>>>>
>>>>> Cheers,
>>>>> Luke Maycock
>>>>> OLIVER WYMAN
>>>>> [email protected]<mailto:luke.
>>>>> [email protected]>
>>>>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>>>>
>>>>>
>>>>>
>>>>> ________________________________
>>>>> From: Gerard Toonstra <[email protected]>
>>>>> Sent: 08 November 2016 18:19
>>>>> To: [email protected]
>>>>> Subject: Re: Skip task
>>>>>
>>>>> Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you
>>> an
>>>>> example.
>>>>>
>>>>> https://github.com/apache/incubator-airflow/blob/1.7.1.
>>>>> 3/airflow/operators/python_operator.py
>>>>>
>>>>> You'd have to modify this to your needs, but the way it works is that
>>> if
>>>>> the condition evaluates to True, none of the
>>>>> downstream tasks are actually executed, they'd be skipped. The reason
>>> for
>>>>> putting them into SKIPPED state is that
>>>>> the DAG final result would still be SUCCESS and not failed.
>>>>>
>>>>> You could copy the operator from there and don't do the full "for
>>> loop",
>>>>> only pick the tasks immediately downstream
>>>>> from this operator and skip that. Or... if you need to skip additional
>>>>> tasks downstream, add a parameter "num_tasks"
>>>>> that decide on a halting condition for the for loop.
>>>>>
>>>>> I believe that should work. I didn't try that here, but you can test
>>> that
>>>>> and see what it does for you.
>>>>>
>>>>>
>>>>> If you want this as a UI capability... for example have a human
>>> operator
>>>>> decide on skipping this yes or not, then
>>>>> maybe the best way forward would be some kind of highly custom plugin
>>>> with
>>>>> its own view. In the end, you'd basically
>>>>> do the same action in the backend, whether the python cond evaluates to
>>>>> True or the button is clicked.
>>>>>
>>>>> In the plugin case though, you'd have to keep the UI and the structure
>>> of
>>>>> the DAG in sync and aligned, otherwise
>>>>> it'd become a mess.... Airflow wasn't really developed for
>>> workflow/human
>>>>> interaction, but in workflows where only
>>>>> automated processes are involved. That doesn't mean that you can't do
>>>>> anything like that, but it may be costly resource
>>>>> wise to get this done. For example, on the basis of the BranchOperator,
>>>> you
>>>>> could call an external API to verify if a decision
>>>>> was taken on a case, then follow branch A or B if the decision is there
>>>> or
>>>>> put the state back into UP_FOR_RETRY.
>>>>> At the moment though, there's no programmatic way to reschedule that
>>> task
>>>>> to some minutes or hours into the future before
>>>>> it's looked at again, unless you really dive into airflow, scheduling
>>>>> semantics (@once vs. other schedules) and how
>>>>> the scheduler works.
>>>>>
>>>>> Rgds,
>>>>>
>>>>> Gerard
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Nov 8, 2016 at 5:30 PM, Maycock, Luke <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>>
>>>>>> I am using Airflow 1.7.1.3 and have a particular requirement, which I
>>>>>> don't think is currently supported by Airflow but just wanted to
>>> check
>>>> in
>>>>>> case I was missing something.
>>>>>>
>>>>>>
>>>>>> I occasionally wish to skip a particular task in a given DAG run such
>>>>> that
>>>>>> the task does not run for that DAG run. Is this functionality
>>> available
>>>>> in
>>>>>> Airflow?
>>>>>>
>>>>>>
>>>>>> I am aware of the BranchPythonOperator (https://airflow.incubator.
>>>>>> apache.org/concepts.html#branching) but I don't think believe this
>>> is
>>>>>> exactly what I am looking for.
>>>>>>
>>>>>>
>>>>>> I am thinking that a button in the UI alongside the 'Mark Success'
>>> and
>>>>>> 'Run' buttons would be appropriate.
>>>>>>
>>>>>>
>>>>>> If the functionality does not exist, does anyone have any suggestions
>>>> on
>>>>>> ways to implement this?
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> Luke Maycock
>>>>>> OLIVER WYMAN
>>>>>> [email protected]<mailto:luke.
>>>>>> [email protected]>
>>>>>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>>>>>
>>>>>>
>>>>>> ________________________________
>>>>>> This e-mail and any attachments may be confidential or legally
>>>>> privileged.
>>>>>> If you received this message in error or are not the intended
>>>> recipient,
>>>>>> you should destroy the e-mail message and any attachments or copies,
>>>> and
>>>>>> you are prohibited from retaining, distributing, disclosing or using
>>>> any
>>>>>> information contained herein. Please inform us of the erroneous
>>>> delivery
>>>>> by
>>>>>> return e-mail. Thank you for your cooperation.
>>>>>>
>>>>>
>>>>> ________________________________
>>>>> This e-mail and any attachments may be confidential or legally
>>>> privileged.
>>>>> If you received this message in error or are not the intended
>>> recipient,
>>>>> you should destroy the e-mail message and any attachments or copies,
>>> and
>>>>> you are prohibited from retaining, distributing, disclosing or using
>>> any
>>>>> information contained herein. Please inform us of the erroneous
>>> delivery
>>>> by
>>>>> return e-mail. Thank you for your cooperation.
>>>>>
>>>>
>>>> ________________________________
>>>> This e-mail and any attachments may be confidential or legally
>>> privileged.
>>>> If you received this message in error or are not the intended recipient,
>>>> you should destroy the e-mail message and any attachments or copies, and
>>>> you are prohibited from retaining, distributing, disclosing or using any
>>>> information contained herein. Please inform us of the erroneous delivery
>>> by
>>>> return e-mail. Thank you for your cooperation.
>>>>
>>>
>>> ________________________________
>>> This e-mail and any attachments may be confidential or legally privileged.
>>> If you received this message in error or are not the intended recipient,
>>> you should destroy the e-mail message and any attachments or copies, and
>>> you are prohibited from retaining, distributing, disclosing or using any
>>> information contained herein. Please inform us of the erroneous delivery by
>>> return e-mail. Thank you for your cooperation.
>>>
>>
>> ________________________________
>> This e-mail and any attachments may be confidential or legally privileged. 
>> If you received this message in error or are not the intended recipient, you 
>> should destroy the e-mail message and any attachments or copies, and you are 
>> prohibited from retaining, distributing, disclosing or using any information 
>> contained herein. Please inform us of the erroneous delivery by return 
>> e-mail. Thank you for your cooperation.
>>
>> ________________________________
>> This e-mail and any attachments may be confidential or legally privileged. 
>> If you received this message in error or are not the intended recipient, you 
>> should destroy the e-mail message and any attachments or copies, and you are 
>> prohibited from retaining, distributing, disclosing or using any information 
>> contained herein. Please inform us of the erroneous delivery by return 
>> e-mail. Thank you for your cooperation.
>
> ________________________________
> This e-mail and any attachments may be confidential or legally privileged. If 
> you received this message in error or are not the intended recipient, you 
> should destroy the e-mail message and any attachments or copies, and you are 
> prohibited from retaining, distributing, disclosing or using any information 
> contained herein. Please inform us of the erroneous delivery by return 
> e-mail. Thank you for your cooperation.


________________________________
This e-mail and any attachments may be confidential or legally privileged. If 
you received this message in error or are not the intended recipient, you 
should destroy the e-mail message and any attachments or copies, and you are 
prohibited from retaining, distributing, disclosing or using any information 
contained herein. Please inform us of the erroneous delivery by return e-mail. 
Thank you for your cooperation.

Reply via email to