kaxil commented on a change in pull request #5145: [AIRFLOW-4379] Remove 
duplicate code & Add validation in gcs_to_gcs.py
URL: https://github.com/apache/airflow/pull/5145#discussion_r277262562
 
 

 ##########
 File path: airflow/contrib/operators/gcs_to_gcs.py
 ##########
 @@ -143,51 +143,50 @@ def execute(self, context):
             google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
             delegate_to=self.delegate_to
         )
-        log_message = 'Executing copy of gs://{0}/{1} to gs://{2}/{3}'
+
+        if self.destination_bucket is None:
+            self.log.warning(
+                'destination_bucket is None. Defaulting it to source_bucket 
(%s)',
+                self.source_bucket)
+            self.destination_bucket = self.source_bucket
 
         if self.wildcard in self.source_object:
+
+            if self.source_object.count(self.wildcard) > 1:
+                raise Exception("You can only use one wildcard in 
source_object parameter.")
+
             prefix, delimiter = self.source_object.split(self.wildcard, 1)
             objects = hook.list(self.source_bucket, prefix=prefix, 
delimiter=delimiter)
 
             for source_object in objects:
-                if self.last_modified_time is not None:
-                    # Check to see if object was modified after 
last_modified_time
-                    if hook.is_updated_after(self.source_bucket, source_object,
-                                             self.last_modified_time):
-                        pass
-                    else:
-                        continue
                 if self.destination_object is None:
                     destination_object = source_object
                 else:
                     destination_object = source_object.replace(prefix,
                                                                
self.destination_object, 1)
-                self.log.info(
-                    log_message.format(self.source_bucket, source_object,
-                                       self.destination_bucket, 
destination_object)
-                )
-
-                hook.rewrite(self.source_bucket, source_object,
-                             self.destination_bucket, destination_object)
-                if self.move_object:
-                    hook.delete(self.source_bucket, source_object)
 
+                self._copy_single_object(hook=hook, 
source_object=source_object,
+                                         destination_object=destination_object)
         else:
-            if self.last_modified_time is not None:
-                if hook.is_updated_after(self.source_bucket,
-                                         self.source_object,
-                                         self.last_modified_time):
-                    pass
-                else:
-                    return
-
-            self.log.info(
-                log_message.format(self.source_bucket, self.source_object,
-                                   self.destination_bucket or 
self.source_bucket,
-                                   self.destination_object or 
self.source_object)
-            )
-            hook.rewrite(self.source_bucket, self.source_object,
-                         self.destination_bucket, self.destination_object)
-
-            if self.move_object:
-                hook.delete(self.source_bucket, self.source_object)
+            self._copy_single_object(hook=hook, 
source_object=self.source_object,
+                                     
destination_object=self.destination_object)
+
+    def _copy_single_object(self, hook, source_object, destination_object):
+        if self.last_modified_time is not None:
 
 Review comment:
   Updated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to