This isn't possible with Sensors as they exist right now. Our solution was to 
not make it a sensor, but just a normal operator, and set a number of retries 
and a large retry_delay:

checkTask = FindTriggerFileForExecutionPeriod(
    task_id="trigger_file_for_period",
    s3_conn_id="s3_default",
    prefix=source_s3_uri,
    dag=dag,

    # Try every 6 hours for 3 days, just in case the trigger is delayed.
    retries=12,
    retry_delay=timedelta(hours=6)
)

When it fails it shows up as "Up for retry" (Yellow) in the scheduler but 
doesn't take up a slot.

The operator is defined as:

class FindTriggerFileForExecutionPeriod(BaseOperator):
    """
    Find any trigger file for the execution period.

    In the unlikely event that there are multiple trigger files the most recent
    (by date in filename, not mtime) will be returned.
    """
    trigger_file_wildcard = 'xxx*_*.gz.trigger'

    @apply_defaults
    def __init__(self, s3_conn_id, prefix, **kwargs):
        self.s3_conn_id = s3_conn_id
        self.prefix = prefix
        super().__init__(**kwargs)

    def execute(self, context):
        hook = S3Hook(self.s3_conn_id)
        (bucket_name, prefix) = hook.parse_s3_url(self.prefix)
        min_date = context['execution_date']
        max_date = context['next_execution_date']

        for key in sorted(hook.list_keys(bucket_name, prefix), reverse=True):
            if not fnmatch.fnmatch(basename(key), self.trigger_file_wildcard):
                continue
            when = _tapad_key_content_date(key)

            # We have sorted the keys, so if we get past the range we are
            # interested in we're not going to find anything.
            if when < min_date:
                break

            if when < max_date:
                self.xcom_push(context, "content_date", 
_generate_iso_prefix(when))
                return "s3://{}/{}".format(bucket_name, key)

        raise RuntimeError("No trigger file found with filename between in 
interval ({start}, {end}]".format(
            start=min_date,
            end=max_date,
        ))






> On 18 Oct 2017, at 13:58, Cieplucha, Michal <[email protected]> 
> wrote:
> 
> Hello,
> 
> We are using sensors for lightweight but taking long time tasks. It's like 
> monitoring results of some testing executed on remote machine. We would like 
> to see such a task in a DAG but with so much tasks/dags in running state we 
> will hit max number of processes/dag runs (defined in airflow.cfg) soon. Is 
> it possible to have a sensor, which instead of sleep and keep the process 
> would just exit and be rescheduled later? Are we using sensors in wrong 
> manner?
> 
> Thanks
> mC
> 
> ---------------------------------------------------------------------------------------------------------------------------------
> I am an Intel employee. All comments and opinions are my own and do not 
> represent the views of Intel.
> --------------------------------------------------------------------
> 
> Intel Technology Poland sp. z o.o.
> ul. Slowackiego 173 | 80-298 Gdansk | Sad Rejonowy Gdansk Polnoc | VII 
> Wydzial Gospodarczy Krajowego Rejestru Sadowego - KRS 101882 | NIP 
> 957-07-52-316 | Kapital zakladowy 200.000 PLN.
> 
> Ta wiadomosc wraz z zalacznikami jest przeznaczona dla okreslonego adresata i 
> moze zawierac informacje poufne. W razie przypadkowego otrzymania tej 
> wiadomosci, prosimy o powiadomienie nadawcy oraz trwale jej usuniecie; 
> jakiekolwiek
> przegladanie lub rozpowszechnianie jest zabronione.
> This e-mail and any attachments may contain confidential material for the 
> sole use of the intended recipient(s). If you are not the intended recipient, 
> please contact the sender and delete all copies; any review or distribution by
> others is strictly prohibited.
> 

Reply via email to