kaxil commented on a change in pull request #5145: [AIRFLOW-4379] Remove
duplicate code & Add validation in gcs_to_gcs.py
URL: https://github.com/apache/airflow/pull/5145#discussion_r277262562
##########
File path: airflow/contrib/operators/gcs_to_gcs.py
##########
@@ -143,51 +143,50 @@ def execute(self, context):
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to
)
- log_message = 'Executing copy of gs://{0}/{1} to gs://{2}/{3}'
+
+ if self.destination_bucket is None:
+ self.log.warning(
+ 'destination_bucket is None. Defaulting it to source_bucket
(%s)',
+ self.source_bucket)
+ self.destination_bucket = self.source_bucket
if self.wildcard in self.source_object:
+
+ if self.source_object.count(self.wildcard) > 1:
+ raise Exception("You can only use one wildcard in
source_object parameter.")
+
prefix, delimiter = self.source_object.split(self.wildcard, 1)
objects = hook.list(self.source_bucket, prefix=prefix,
delimiter=delimiter)
for source_object in objects:
- if self.last_modified_time is not None:
- # Check to see if object was modified after
last_modified_time
- if hook.is_updated_after(self.source_bucket, source_object,
- self.last_modified_time):
- pass
- else:
- continue
if self.destination_object is None:
destination_object = source_object
else:
destination_object = source_object.replace(prefix,
self.destination_object, 1)
- self.log.info(
- log_message.format(self.source_bucket, source_object,
- self.destination_bucket,
destination_object)
- )
-
- hook.rewrite(self.source_bucket, source_object,
- self.destination_bucket, destination_object)
- if self.move_object:
- hook.delete(self.source_bucket, source_object)
+ self._copy_single_object(hook=hook,
source_object=source_object,
+ destination_object=destination_object)
else:
- if self.last_modified_time is not None:
- if hook.is_updated_after(self.source_bucket,
- self.source_object,
- self.last_modified_time):
- pass
- else:
- return
-
- self.log.info(
- log_message.format(self.source_bucket, self.source_object,
- self.destination_bucket or
self.source_bucket,
- self.destination_object or
self.source_object)
- )
- hook.rewrite(self.source_bucket, self.source_object,
- self.destination_bucket, self.destination_object)
-
- if self.move_object:
- hook.delete(self.source_bucket, self.source_object)
+ self._copy_single_object(hook=hook,
source_object=self.source_object,
+
destination_object=self.destination_object)
+
+ def _copy_single_object(self, hook, source_object, destination_object):
+ if self.last_modified_time is not None:
Review comment:
Updated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services