Repository: incubator-airflow Updated Branches: refs/heads/master c49d0b36b -> 61370fbf7
[AIRFLOW-588] Add Google Cloud Storage Object sensor[] The Cloud Storage sensor will check for the existence if an object in a bucket. It will wait till the object exists before continuing. Closes #1849 from alexvanboxel/feature/airflow-588 -gcs-sensor Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/61370fbf Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/61370fbf Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/61370fbf Branch: refs/heads/master Commit: 61370fbf7c1a7679d45b70ffa3ca438c815ad56a Parents: c49d0b3 Author: Alex Van Boxel <a...@vanboxel.be> Authored: Tue Oct 25 10:34:34 2016 -0700 Committer: Chris Riccomini <chr...@wepay.com> Committed: Tue Oct 25 10:34:34 2016 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/gcs_hook.py | 29 +++++++++++++- airflow/contrib/sensors/gcs_sensor.py | 64 ++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61370fbf/airflow/contrib/hooks/gcs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index 0bbecba..a728559 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -15,9 +15,11 @@ import logging -from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook from apiclient.discovery import build from apiclient.http import MediaFileUpload +from googleapiclient import errors + +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook logging.getLogger("google_cloud_storage").setLevel(logging.INFO) @@ -31,7 +33,8 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): def __init__(self, google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None): - super(GoogleCloudStorageHook, self).__init__(google_cloud_storage_conn_id, delegate_to) + super(GoogleCloudStorageHook, self).__init__(google_cloud_storage_conn_id, + delegate_to) def get_conn(self): """ @@ -84,3 +87,25 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): .objects() \ .insert(bucket=bucket, name=object, media_body=media) \ .execute() + + def exists(self, bucket, object): + """ + Checks for the existence of a file in Google Cloud Storage. + + :param bucket: The Google cloud storage bucket where the object is. + :type bucket: string + :param object: The name of the object to check in the Google cloud + storage bucket. + :type object: string + """ + service = self.get_conn() + try: + service \ + .objects() \ + .get(bucket=bucket, object=object) \ + .execute() + return True + except errors.HttpError as ex: + if ex.resp['status'] == '404': + return False + raise http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61370fbf/airflow/contrib/sensors/gcs_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py new file mode 100644 index 0000000..2d5f6af --- /dev/null +++ b/airflow/contrib/sensors/gcs_sensor.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.operators.sensors import BaseSensorOperator +from airflow.utils.decorators import apply_defaults + + +class GoogleCloudStorageObjectSensor(BaseSensorOperator): + """ + Checks for the existence of a file in Google Cloud Storage. + """ + template_fields = ('bucket', 'object') + ui_color = '#f0eee4' + + @apply_defaults + def __init__( + self, + bucket, + object, + google_cloud_conn_id='google_cloud_storage_default', + delegate_to=None, + *args, + **kwargs): + """ + Create a new GoogleCloudStorageDownloadOperator. + + :param bucket: The Google cloud storage bucket where the object is. + :type bucket: string + :param object: The name of the object to check in the Google cloud + storage bucket. + :type object: string + :param google_cloud_storage_conn_id: The connection ID to use when + connecting to Google cloud storage. + :type google_cloud_storage_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide delegation enabled. + :type delegate_to: string + """ + super(GoogleCloudStorageObjectSensor, self).__init__(*args, **kwargs) + self.bucket = bucket + self.object = object + self.google_cloud_conn_id = google_cloud_conn_id + self.delegate_to = delegate_to + + def poke(self, context): + logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object) + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_conn_id, + delegate_to=self.delegate_to) + return hook.exists(self.bucket, self.object)