This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new e324b37  Add job name and progress logs to Cloud Storage Transfer Hook 
(#12014)
e324b37 is described below

commit e324b37a67e32c368df50604a00160d7766b5c33
Author: Joshua Carp <[email protected]>
AuthorDate: Mon Nov 2 19:00:34 2020 -0500

    Add job name and progress logs to Cloud Storage Transfer Hook (#12014)
---
 .../cloud/hooks/cloud_storage_transfer_service.py  | 18 ++++----
 .../sensors/cloud_storage_transfer_service.py      | 10 ++++-
 .../hooks/test_cloud_storage_transfer_service.py   | 48 +++++++++++++++++++---
 .../sensors/test_cloud_storage_transfer_service.py | 46 +++++++++++++++++++--
 4 files changed, 104 insertions(+), 18 deletions(-)

diff --git 
a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py 
b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
index 0e30cb0..b7aae10 100644
--- a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
@@ -62,7 +62,7 @@ AWS_ACCESS_KEY = "awsAccessKey"
 AWS_S3_DATA_SOURCE = 'awsS3DataSource'
 BODY = 'body'
 BUCKET_NAME = 'bucketName'
-JOB_NAME = 'name'
+COUNTERS = 'counters'
 DAY = 'day'
 DESCRIPTION = "description"
 FILTER = 'filter'
@@ -72,6 +72,7 @@ GCS_DATA_SINK = 'gcsDataSink'
 GCS_DATA_SOURCE = 'gcsDataSource'
 HOURS = "hours"
 HTTP_DATA_SOURCE = 'httpDataSource'
+JOB_NAME = 'name'
 LIST_URL = 'list_url'
 METADATA = 'metadata'
 MINUTES = "minutes"
@@ -89,8 +90,8 @@ START_TIME_OF_DAY = 'startTimeOfDay'
 STATUS = "status"
 STATUS1 = 'status'
 TRANSFER_JOB = 'transfer_job'
-TRANSFER_JOB_FIELD_MASK = 'update_transfer_job_field_mask'
 TRANSFER_JOBS = 'transferJobs'
+TRANSFER_JOB_FIELD_MASK = 'update_transfer_job_field_mask'
 TRANSFER_OPERATIONS = 'transferOperations'
 TRANSFER_OPTIONS = 'transfer_options'
 TRANSFER_SPEC = 'transferSpec'
@@ -193,6 +194,7 @@ class CloudDataTransferServiceHook(GoogleBaseHook):
                     return self.enable_transfer_job(job_name=job_name, 
project_id=body.get(PROJECT_ID))
             else:
                 raise e
+        self.log.info("Created job %s", transfer_job[NAME])
         return transfer_job
 
     @GoogleBaseHook.fallback_to_default_project_id
@@ -467,13 +469,13 @@ class CloudDataTransferServiceHook(GoogleBaseHook):
 
         start_time = time.time()
         while time.time() - start_time < timeout:
-            operations = self.list_transfer_operations(
-                request_filter={FILTER_PROJECT_ID: job[PROJECT_ID], 
FILTER_JOB_NAMES: [job[NAME]]}
-            )
+            request_filter = {FILTER_PROJECT_ID: job[PROJECT_ID], 
FILTER_JOB_NAMES: [job[NAME]]}
+            operations = 
self.list_transfer_operations(request_filter=request_filter)
+
+            for operation in operations:
+                self.log.info("Progress for operation %s: %s", 
operation[NAME], operation[METADATA][COUNTERS])
 
-            if 
CloudDataTransferServiceHook.operations_contain_expected_statuses(
-                operations, expected_statuses
-            ):
+            if self.operations_contain_expected_statuses(operations, 
expected_statuses):
                 return
             time.sleep(TIME_TO_SLEEP_IN_SECONDS)
         raise AirflowException("Timeout. The operation could not be completed 
within the allotted time.")
diff --git 
a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py 
b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
index 75c8ac0..dec6de1 100644
--- a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
@@ -18,7 +18,12 @@
 """This module contains a Google Cloud Transfer sensor."""
 from typing import Optional, Sequence, Set, Union
 
-from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service 
import CloudDataTransferServiceHook
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service 
import (
+    CloudDataTransferServiceHook,
+    COUNTERS,
+    METADATA,
+    NAME,
+)
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 
@@ -91,6 +96,9 @@ class 
CloudDataTransferServiceJobStatusSensor(BaseSensorOperator):
             request_filter={'project_id': self.project_id, 'job_names': 
[self.job_name]}
         )
 
+        for operation in operations:
+            self.log.info("Progress for operation %s: %s", operation[NAME], 
operation[METADATA][COUNTERS])
+
         check = 
CloudDataTransferServiceHook.operations_contain_expected_statuses(
             operations=operations, expected_statuses=self.expected_statuses
         )
diff --git 
a/tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service.py 
b/tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service.py
index 229f0c7..4fda2ad 100644
--- a/tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service.py
+++ b/tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service.py
@@ -28,6 +28,7 @@ from parameterized import parameterized
 
 from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service 
import (
+    COUNTERS,
     DESCRIPTION,
     FILTER_JOB_NAMES,
     FILTER_PROJECT_ID,
@@ -81,6 +82,12 @@ TEST_RESULT_STATUS_ENABLED = {STATUS: 
GcpTransferJobsStatus.ENABLED}
 TEST_RESULT_STATUS_DISABLED = {STATUS: GcpTransferJobsStatus.DISABLED}
 TEST_RESULT_STATUS_DELETED = {STATUS: GcpTransferJobsStatus.DELETED}
 
+TEST_NAME = "transferOperations/transferJobs-123-456"
+TEST_COUNTERS = {
+    "bytesFoundFromSource": 512,
+    "bytesCopiedToSink": 1024,
+}
+
 
 def _without_key(body, key):
     obj = deepcopy(body)
@@ -370,8 +377,24 @@ class 
TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
     )
     def test_wait_for_transfer_job(self, mock_list, mock_sleep, 
mock_project_id):
         mock_list.side_effect = [
-            [{METADATA: {STATUS: GcpTransferOperationStatus.IN_PROGRESS}}],
-            [{METADATA: {STATUS: GcpTransferOperationStatus.SUCCESS}}],
+            [
+                {
+                    NAME: TEST_NAME,
+                    METADATA: {
+                        STATUS: GcpTransferOperationStatus.IN_PROGRESS,
+                        COUNTERS: TEST_COUNTERS,
+                    },
+                },
+            ],
+            [
+                {
+                    NAME: TEST_NAME,
+                    METADATA: {
+                        STATUS: GcpTransferOperationStatus.SUCCESS,
+                        COUNTERS: TEST_COUNTERS,
+                    },
+                },
+            ],
         ]
 
         job_name = 'transferJobs/test-job'
@@ -400,7 +423,13 @@ class 
TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
         list_execute_method = list_method.return_value.execute
         list_execute_method.return_value = {
             OPERATIONS: [
-                {NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS: 
GcpTransferOperationStatus.FAILED}}
+                {
+                    NAME: TEST_TRANSFER_OPERATION_NAME,
+                    METADATA: {
+                        STATUS: GcpTransferOperationStatus.FAILED,
+                        COUNTERS: TEST_COUNTERS,
+                    },
+                }
             ]
         }
 
@@ -427,7 +456,13 @@ class 
TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
         list_execute_method = list_method.return_value.execute
         list_execute_method.return_value = {
             OPERATIONS: [
-                {NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS: 
GcpTransferOperationStatus.FAILED}}
+                {
+                    NAME: TEST_TRANSFER_OPERATION_NAME,
+                    METADATA: {
+                        STATUS: GcpTransferOperationStatus.FAILED,
+                        COUNTERS: TEST_COUNTERS,
+                    },
+                }
             ]
         }
 
@@ -498,7 +533,10 @@ class 
TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
         ]
     )
     def test_operations_contain_expected_statuses_green_path(self, statuses, 
expected_statuses):
-        operations = [{NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS: 
status}} for status in statuses]
+        operations = [
+            {NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS: status, 
COUNTERS: TEST_COUNTERS}}
+            for status in statuses
+        ]
 
         result = 
CloudDataTransferServiceHook.operations_contain_expected_statuses(
             operations, expected_statuses
diff --git 
a/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py 
b/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
index 9d7bb4d..e867469 100644
--- 
a/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
+++ 
b/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
@@ -25,13 +25,27 @@ from 
airflow.providers.google.cloud.sensors.cloud_storage_transfer_service impor
     CloudDataTransferServiceJobStatusSensor,
 )
 
+TEST_NAME = "transferOperations/transferJobs-123-456"
+TEST_COUNTERS = {
+    "bytesFoundFromSource": 512,
+    "bytesCopiedToSink": 1024,
+}
+
 
 class TestGcpStorageTransferOperationWaitForJobStatusSensor(unittest.TestCase):
     @mock.patch(
         
'airflow.providers.google.cloud.sensors.cloud_storage_transfer_service.CloudDataTransferServiceHook'
     )
     def test_wait_for_status_success(self, mock_tool):
-        operations = [{'metadata': {'status': 
GcpTransferOperationStatus.SUCCESS}}]
+        operations = [
+            {
+                'name': TEST_NAME,
+                'metadata': {
+                    'status': GcpTransferOperationStatus.SUCCESS,
+                    'counters': TEST_COUNTERS,
+                },
+            }
+        ]
         mock_tool.return_value.list_transfer_operations.return_value = 
operations
         mock_tool.operations_contain_expected_statuses.return_value = True
 
@@ -79,8 +93,24 @@ class 
TestGcpStorageTransferOperationWaitForJobStatusSensor(unittest.TestCase):
     )
     def test_wait_for_status_after_retry(self, mock_tool):
         operations_set = [
-            [{'metadata': {'status': GcpTransferOperationStatus.SUCCESS}}],
-            [{'metadata': {'status': GcpTransferOperationStatus.SUCCESS}}],
+            [
+                {
+                    'name': TEST_NAME,
+                    'metadata': {
+                        'status': GcpTransferOperationStatus.SUCCESS,
+                        'counters': TEST_COUNTERS,
+                    },
+                },
+            ],
+            [
+                {
+                    'name': TEST_NAME,
+                    'metadata': {
+                        'status': GcpTransferOperationStatus.SUCCESS,
+                        'counters': TEST_COUNTERS,
+                    },
+                },
+            ],
         ]
 
         mock_tool.return_value.list_transfer_operations.side_effect = 
operations_set
@@ -124,7 +154,15 @@ class 
TestGcpStorageTransferOperationWaitForJobStatusSensor(unittest.TestCase):
         
'airflow.providers.google.cloud.sensors.cloud_storage_transfer_service.CloudDataTransferServiceHook'
     )
     def test_wait_for_status_normalize_status(self, expected_status, 
received_status, mock_tool):
-        operations = [{'metadata': {'status': 
GcpTransferOperationStatus.SUCCESS}}]
+        operations = [
+            {
+                'name': TEST_NAME,
+                'metadata': {
+                    'status': GcpTransferOperationStatus.SUCCESS,
+                    'counters': TEST_COUNTERS,
+                },
+            }
+        ]
 
         mock_tool.return_value.list_transfer_operations.return_value = 
operations
         mock_tool.operations_contain_expected_statuses.side_effect = [False, 
True]

Reply via email to