Repository: incubator-airflow Updated Branches: refs/heads/master f939d7832 -> 9df4789a9
[AIRFLOW-717] Add Cloud Storage updated sensor Add a Cloud Storage sensor that triggers when a object is created or updated after a specific date. Allow setting a callback that defines the update requirements. The default is execution_date + trigger_interval. Closes #1959 from alexvanboxel/feature/gcp_sensor_updated Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9df4789a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9df4789a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9df4789a Branch: refs/heads/master Commit: 9df4789a9c30ba4ddfc94a01feb2db61743f9334 Parents: f939d78 Author: Alex Van Boxel <[email protected]> Authored: Tue Jan 3 20:24:07 2017 +0100 Committer: Alex Van Boxel <[email protected]> Committed: Tue Jan 3 20:24:07 2017 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/gcs_hook.py | 43 +++++++++++++++++++ airflow/contrib/sensors/gcs_sensor.py | 66 +++++++++++++++++++++++++++++- 2 files changed, 107 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9df4789a/airflow/contrib/hooks/gcs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index a728559..dd3cd27 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -43,6 +43,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): http_authorized = self._authorize() return build('storage', 'v1', http=http_authorized) + # pylint:disable=redefined-builtin def download(self, bucket, object, filename=False): """ Get a file from Google Cloud Storage. @@ -68,6 +69,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): return downloaded_file_bytes + # pylint:disable=redefined-builtin def upload(self, bucket, object, filename, mime_type='application/octet-stream'): """ Uploads a local file to Google Cloud Storage. @@ -88,6 +90,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): .insert(bucket=bucket, name=object, media_body=media) \ .execute() + # pylint:disable=redefined-builtin def exists(self, bucket, object): """ Checks for the existence of a file in Google Cloud Storage. @@ -109,3 +112,43 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): if ex.resp['status'] == '404': return False raise + + # pylint:disable=redefined-builtin + def is_updated_after(self, bucket, object, ts): + """ + Checks if an object is updated in Google Cloud Storage. + + :param bucket: The Google cloud storage bucket where the object is. + :type bucket: string + :param object: The name of the object to check in the Google cloud + storage bucket. + :type object: string + :param ts: The timestamp to check against. + :type ts: datetime + """ + service = self.get_conn() + try: + response = (service + .objects() + .get(bucket=bucket, object=object) + .execute()) + + if 'updated' in response: + import dateutil.parser + import dateutil.tz + + if not ts.tzinfo: + ts = ts.replace(tzinfo=dateutil.tz.tzutc()) + + updated = dateutil.parser.parse(response['updated']) + logging.log(logging.INFO, "Verify object date: " + str(updated) + + " > " + str(ts)) + + if updated > ts: + return True + + except errors.HttpError as ex: + if ex.resp['status'] != '404': + raise + + return False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9df4789a/airflow/contrib/sensors/gcs_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py index 2d5f6af..c9d741b 100644 --- a/airflow/contrib/sensors/gcs_sensor.py +++ b/airflow/contrib/sensors/gcs_sensor.py @@ -30,13 +30,13 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator): def __init__( self, bucket, - object, + object, # pylint:disable=redefined-builtin google_cloud_conn_id='google_cloud_storage_default', delegate_to=None, *args, **kwargs): """ - Create a new GoogleCloudStorageDownloadOperator. + Create a new GoogleCloudStorageObjectSensor. :param bucket: The Google cloud storage bucket where the object is. :type bucket: string @@ -62,3 +62,65 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator): google_cloud_storage_conn_id=self.google_cloud_conn_id, delegate_to=self.delegate_to) return hook.exists(self.bucket, self.object) + + +def ts_function(context): + """ + Default callback for the GoogleCloudStorageObjectUpdatedSensor. The default + behaviour is check for the object being updated after execution_date + + schedule_interval. + """ + return context['execution_date'] + context['dag'].schedule_interval + + +class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator): + """ + Checks if an object is updated in Google Cloud Storage. + """ + template_fields = ('bucket', 'object') + template_ext = ('.sql',) + ui_color = '#f0eee4' + + @apply_defaults + def __init__( + self, + bucket, + object, # pylint:disable=redefined-builtin + ts_func=ts_function, + google_cloud_conn_id='google_cloud_storage_default', + delegate_to=None, + *args, + **kwargs): + """ + Create a new GoogleCloudStorageObjectUpdatedSensor. + + :param bucket: The Google cloud storage bucket where the object is. + :type bucket: string + :param object: The name of the object to download in the Google cloud + storage bucket. + :type object: string + :param ts_func: Callback for defining the update condition. The default callback + returns execution_date + schedule_interval. The callback takes the context + as parameter. + :type ts_func: function + :param google_cloud_storage_conn_id: The connection ID to use when + connecting to Google cloud storage. + :type google_cloud_storage_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + super(GoogleCloudStorageObjectUpdatedSensor, self).__init__(*args, **kwargs) + self.bucket = bucket + self.object = object + self.ts_func = ts_func + self.google_cloud_conn_id = google_cloud_conn_id + self.delegate_to = delegate_to + + def poke(self, context): + logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object) + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_conn_id, + delegate_to=self.delegate_to) + return hook.is_updated_after(self.bucket, self.object, self.ts_func(context))
