[ 
https://issues.apache.org/jira/browse/AIRFLOW-3155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16650023#comment-16650023
 ] 

ASF GitHub Bot commented on AIRFLOW-3155:
-----------------------------------------

kaxil closed pull request #4008: [AIRFLOW-3155] Add ability to filter by a last 
modified time in GoogleCloudStorageToGoogleCloudStorageOperator
URL: https://github.com/apache/incubator-airflow/pull/4008
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/operators/gcs_to_gcs.py 
b/airflow/contrib/operators/gcs_to_gcs.py
index 12fbff5276..0e1087e4d2 100644
--- a/airflow/contrib/operators/gcs_to_gcs.py
+++ b/airflow/contrib/operators/gcs_to_gcs.py
@@ -62,6 +62,10 @@ class 
GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
         For this to work, the service account making the request must have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param last_modified_time: When specified, if the object(s) were
+        modified after last_modified_time, they will be copied/moved.
+        If tzinfo has not been set, UTC will be assumed.
+    :type last_modified_time: datetime
 
     **Examples**:
         The following Operator would copy a single file named
@@ -114,6 +118,7 @@ def __init__(self,
                  move_object=False,
                  google_cloud_storage_conn_id='google_cloud_default',
                  delegate_to=None,
+                 last_modified_time=None,
                  *args,
                  **kwargs):
         super(GoogleCloudStorageToGoogleCloudStorageOperator,
@@ -125,6 +130,7 @@ def __init__(self,
         self.move_object = move_object
         self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
         self.delegate_to = delegate_to
+        self.last_modified_time = last_modified_time
         self.wildcard = '*'
 
     def execute(self, context):
@@ -140,6 +146,13 @@ def execute(self, context):
             objects = hook.list(self.source_bucket, prefix=prefix, 
delimiter=delimiter)
 
             for source_object in objects:
+                if self.last_modified_time is not None:
+                    # Check to see if object was modified after 
last_modified_time
+                    if hook.is_updated_after(self.source_bucket, source_object,
+                                             self.last_modified_time):
+                        pass
+                    else:
+                        continue
                 if self.destination_object is None:
                     destination_object = source_object
                 else:
@@ -156,6 +169,14 @@ def execute(self, context):
                     hook.delete(self.source_bucket, source_object)
 
         else:
+            if self.last_modified_time is not None:
+                if hook.is_updated_after(self.source_bucket,
+                                         self.source_object,
+                                         self.last_modified_time):
+                    pass
+                else:
+                    return
+
             self.log.info(
                 log_message.format(self.source_bucket, self.source_object,
                                    self.destination_bucket or 
self.source_bucket,
diff --git a/tests/contrib/operators/test_gcs_to_gcs_operator.py 
b/tests/contrib/operators/test_gcs_to_gcs_operator.py
index 6b866d11e1..dd16e2f2df 100644
--- a/tests/contrib/operators/test_gcs_to_gcs_operator.py
+++ b/tests/contrib/operators/test_gcs_to_gcs_operator.py
@@ -18,6 +18,7 @@
 # under the License.
 
 import unittest
+from datetime import datetime
 
 from airflow.contrib.operators.gcs_to_gcs import \
     GoogleCloudStorageToGoogleCloudStorageOperator
@@ -38,6 +39,7 @@
 SOURCE_OBJECT_2 = 'test_object*'
 SOURCE_OBJECT_3 = 'test*object'
 SOURCE_OBJECT_4 = 'test_object*.txt'
+SOURCE_OBJECT_5 = 'test_object.txt'
 DESTINATION_BUCKET = 'archive'
 DESTINATION_OBJECT_PREFIX = 'foo/bar'
 SOURCE_FILES_LIST = [
@@ -45,6 +47,7 @@
     'test_object/file2.txt',
     'test_object/file3.json',
 ]
+MOD_TIME_1 = datetime(2016, 1, 1)
 
 
 class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase):
@@ -167,3 +170,113 @@ def test_execute_wildcard_empty_destination_object(self, 
mock_hook):
                       DESTINATION_BUCKET, '/file2.txt'),
         ]
         mock_hook.return_value.rewrite.assert_has_calls(mock_calls_empty)
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_execute_last_modified_time(self, mock_hook):
+        mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_4,
+            destination_bucket=DESTINATION_BUCKET,
+            last_modified_time=None)
+
+        operator.execute(None)
+        mock_calls_none = [
+            mock.call(TEST_BUCKET, 'test_object/file1.txt',
+                      DESTINATION_BUCKET, 'test_object/file1.txt'),
+            mock.call(TEST_BUCKET, 'test_object/file2.txt',
+                      DESTINATION_BUCKET, 'test_object/file2.txt'),
+        ]
+        mock_hook.return_value.rewrite.assert_has_calls(mock_calls_none)
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_wc_with_last_modified_time_with_all_true_cond(self, mock_hook):
+        mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
+        mock_hook.return_value.is_updated_after.side_effect = [True, True, 
True]
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_4,
+            destination_bucket=DESTINATION_BUCKET,
+            last_modified_time=MOD_TIME_1)
+
+        operator.execute(None)
+        mock_calls_none = [
+            mock.call(TEST_BUCKET, 'test_object/file1.txt',
+                      DESTINATION_BUCKET, 'test_object/file1.txt'),
+            mock.call(TEST_BUCKET, 'test_object/file2.txt',
+                      DESTINATION_BUCKET, 'test_object/file2.txt'),
+        ]
+        mock_hook.return_value.rewrite.assert_has_calls(mock_calls_none)
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_wc_with_last_modified_time_with_one_true_cond(self, mock_hook):
+        mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
+        mock_hook.return_value.is_updated_after.side_effect = [True, False, 
False]
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_4,
+            destination_bucket=DESTINATION_BUCKET,
+            last_modified_time=MOD_TIME_1)
+
+        operator.execute(None)
+        mock_hook.return_value.rewrite.assert_called_once_with(
+            TEST_BUCKET, 'test_object/file1.txt',
+            DESTINATION_BUCKET, 'test_object/file1.txt')
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_wc_with_no_last_modified_time(self, mock_hook):
+        mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_4,
+            destination_bucket=DESTINATION_BUCKET,
+            last_modified_time=None)
+
+        operator.execute(None)
+        mock_calls_none = [
+            mock.call(TEST_BUCKET, 'test_object/file1.txt',
+                      DESTINATION_BUCKET, 'test_object/file1.txt'),
+            mock.call(TEST_BUCKET, 'test_object/file2.txt',
+                      DESTINATION_BUCKET, 'test_object/file2.txt'),
+        ]
+        mock_hook.return_value.rewrite.assert_has_calls(mock_calls_none)
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_no_prefix_with_last_modified_time_with_true_cond(self, mock_hook):
+        mock_hook.return_value.is_updated_after.return_value = True
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_5,
+            destination_bucket=DESTINATION_BUCKET,
+            destination_object=SOURCE_OBJECT_5,
+            last_modified_time=MOD_TIME_1)
+
+        operator.execute(None)
+        mock_hook.return_value.rewrite.assert_called_once_with(
+            TEST_BUCKET, 'test_object.txt', DESTINATION_BUCKET, 
'test_object.txt')
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_execute_no_prefix_with_no_last_modified_time(self, mock_hook):
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_5,
+            destination_bucket=DESTINATION_BUCKET,
+            destination_object=SOURCE_OBJECT_5,
+            last_modified_time=None)
+
+        operator.execute(None)
+        mock_hook.return_value.rewrite.assert_called_once_with(
+            TEST_BUCKET, 'test_object.txt', DESTINATION_BUCKET, 
'test_object.txt')
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_no_prefix_with_last_modified_time_with_false_cond(self, 
mock_hook):
+        mock_hook.return_value.is_updated_after.return_value = False
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_5,
+            destination_bucket=DESTINATION_BUCKET,
+            destination_object=SOURCE_OBJECT_5,
+            last_modified_time=MOD_TIME_1)
+
+        operator.execute(None)
+        mock_hook.return_value.rewrite.assert_not_called()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add ability to filter by a last modified time in 
> GoogleCloudStorageToGoogleCloudStorageOperator
> -----------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3155
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3155
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: gcp
>    Affects Versions: 2.0.0
>            Reporter: Brandon Kvarda
>            Assignee: Brandon Kvarda
>            Priority: Minor
>             Fix For: 2.0.0
>
>
> Currently the GoogleCloudStorageToGoogleCloudStorageOperator doesn't support 
> filtering objects based on a last modified time/date. This would add the 
> ability to further filter source object(s) to copy/move based on a last 
> modified time threshold (for example, if the objects were updated after the 
> last run at 10:00 yesterday, then copy/move them; otherwise, do not.) 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to