Repository: incubator-airflow Updated Branches: refs/heads/master eff68882b -> d8115e982
[AIRFLOW-1843] Add Google Cloud Storage Sensor with prefix Sensor for checking if there any files in bucket at certain prefix Closes #2809 from litdeviant/gcs_prefix_sensor Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8115e98 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8115e98 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8115e98 Branch: refs/heads/master Commit: d8115e982b19e39fb96362c1acb3bf1715bc9180 Parents: eff6888 Author: Igors Vaitkus <[email protected]> Authored: Mon Nov 27 11:35:01 2017 -0800 Committer: Chris Riccomini <[email protected]> Committed: Mon Nov 27 11:35:01 2017 -0800 ---------------------------------------------------------------------- airflow/contrib/sensors/gcs_sensor.py | 45 ++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8115e98/airflow/contrib/sensors/gcs_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py index 384e26f..a45923a 100644 --- a/airflow/contrib/sensors/gcs_sensor.py +++ b/airflow/contrib/sensors/gcs_sensor.py @@ -121,3 +121,48 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator): google_cloud_storage_conn_id=self.google_cloud_conn_id, delegate_to=self.delegate_to) return hook.is_updated_after(self.bucket, self.object, self.ts_func(context)) + + +class GoogleCloudStoragePrefixSensor(BaseSensorOperator): + """ + Checks for the existence of a files at prefix in Google Cloud Storage bucket. + """ + template_fields = ('bucket', 'prefix') + ui_color = '#f0eee4' + + @apply_defaults + def __init__( + self, + bucket, + prefix, + google_cloud_conn_id='google_cloud_storage_default', + delegate_to=None, + *args, + **kwargs): + """ + Create a new GoogleCloudStorageObjectSensor. + + :param bucket: The Google cloud storage bucket where the object is. + :type bucket: string + :param prefix: The name of the prefix to check in the Google cloud + storage bucket. + :type prefix: string + :param google_cloud_storage_conn_id: The connection ID to use when + connecting to Google cloud storage. + :type google_cloud_storage_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide delegation enabled. + :type delegate_to: string + """ + super(GoogleCloudStoragePrefixSensor, self).__init__(*args, **kwargs) + self.bucket = bucket + self.prefix = prefix + self.google_cloud_conn_id = google_cloud_conn_id + self.delegate_to = delegate_to + + def poke(self, context): + self.log.info('Sensor checks existence of objects: %s, %s', self.bucket, self.prefix) + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_conn_id, + delegate_to=self.delegate_to) + return bool(hook.list(self.bucket, prefix=self.prefix))
