This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new e324b37 Add job name and progress logs to Cloud Storage Transfer Hook
(#12014)
e324b37 is described below
commit e324b37a67e32c368df50604a00160d7766b5c33
Author: Joshua Carp <[email protected]>
AuthorDate: Mon Nov 2 19:00:34 2020 -0500
Add job name and progress logs to Cloud Storage Transfer Hook (#12014)
---
.../cloud/hooks/cloud_storage_transfer_service.py | 18 ++++----
.../sensors/cloud_storage_transfer_service.py | 10 ++++-
.../hooks/test_cloud_storage_transfer_service.py | 48 +++++++++++++++++++---
.../sensors/test_cloud_storage_transfer_service.py | 46 +++++++++++++++++++--
4 files changed, 104 insertions(+), 18 deletions(-)
diff --git
a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
index 0e30cb0..b7aae10 100644
--- a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
@@ -62,7 +62,7 @@ AWS_ACCESS_KEY = "awsAccessKey"
AWS_S3_DATA_SOURCE = 'awsS3DataSource'
BODY = 'body'
BUCKET_NAME = 'bucketName'
-JOB_NAME = 'name'
+COUNTERS = 'counters'
DAY = 'day'
DESCRIPTION = "description"
FILTER = 'filter'
@@ -72,6 +72,7 @@ GCS_DATA_SINK = 'gcsDataSink'
GCS_DATA_SOURCE = 'gcsDataSource'
HOURS = "hours"
HTTP_DATA_SOURCE = 'httpDataSource'
+JOB_NAME = 'name'
LIST_URL = 'list_url'
METADATA = 'metadata'
MINUTES = "minutes"
@@ -89,8 +90,8 @@ START_TIME_OF_DAY = 'startTimeOfDay'
STATUS = "status"
STATUS1 = 'status'
TRANSFER_JOB = 'transfer_job'
-TRANSFER_JOB_FIELD_MASK = 'update_transfer_job_field_mask'
TRANSFER_JOBS = 'transferJobs'
+TRANSFER_JOB_FIELD_MASK = 'update_transfer_job_field_mask'
TRANSFER_OPERATIONS = 'transferOperations'
TRANSFER_OPTIONS = 'transfer_options'
TRANSFER_SPEC = 'transferSpec'
@@ -193,6 +194,7 @@ class CloudDataTransferServiceHook(GoogleBaseHook):
return self.enable_transfer_job(job_name=job_name,
project_id=body.get(PROJECT_ID))
else:
raise e
+ self.log.info("Created job %s", transfer_job[NAME])
return transfer_job
@GoogleBaseHook.fallback_to_default_project_id
@@ -467,13 +469,13 @@ class CloudDataTransferServiceHook(GoogleBaseHook):
start_time = time.time()
while time.time() - start_time < timeout:
- operations = self.list_transfer_operations(
- request_filter={FILTER_PROJECT_ID: job[PROJECT_ID],
FILTER_JOB_NAMES: [job[NAME]]}
- )
+ request_filter = {FILTER_PROJECT_ID: job[PROJECT_ID],
FILTER_JOB_NAMES: [job[NAME]]}
+ operations =
self.list_transfer_operations(request_filter=request_filter)
+
+ for operation in operations:
+ self.log.info("Progress for operation %s: %s",
operation[NAME], operation[METADATA][COUNTERS])
- if
CloudDataTransferServiceHook.operations_contain_expected_statuses(
- operations, expected_statuses
- ):
+ if self.operations_contain_expected_statuses(operations,
expected_statuses):
return
time.sleep(TIME_TO_SLEEP_IN_SECONDS)
raise AirflowException("Timeout. The operation could not be completed
within the allotted time.")
diff --git
a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
index 75c8ac0..dec6de1 100644
--- a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
@@ -18,7 +18,12 @@
"""This module contains a Google Cloud Transfer sensor."""
from typing import Optional, Sequence, Set, Union
-from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service
import CloudDataTransferServiceHook
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service
import (
+ CloudDataTransferServiceHook,
+ COUNTERS,
+ METADATA,
+ NAME,
+)
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@@ -91,6 +96,9 @@ class
CloudDataTransferServiceJobStatusSensor(BaseSensorOperator):
request_filter={'project_id': self.project_id, 'job_names':
[self.job_name]}
)
+ for operation in operations:
+ self.log.info("Progress for operation %s: %s", operation[NAME],
operation[METADATA][COUNTERS])
+
check =
CloudDataTransferServiceHook.operations_contain_expected_statuses(
operations=operations, expected_statuses=self.expected_statuses
)
diff --git
a/tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service.py
b/tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service.py
index 229f0c7..4fda2ad 100644
--- a/tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service.py
+++ b/tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service.py
@@ -28,6 +28,7 @@ from parameterized import parameterized
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service
import (
+ COUNTERS,
DESCRIPTION,
FILTER_JOB_NAMES,
FILTER_PROJECT_ID,
@@ -81,6 +82,12 @@ TEST_RESULT_STATUS_ENABLED = {STATUS:
GcpTransferJobsStatus.ENABLED}
TEST_RESULT_STATUS_DISABLED = {STATUS: GcpTransferJobsStatus.DISABLED}
TEST_RESULT_STATUS_DELETED = {STATUS: GcpTransferJobsStatus.DELETED}
+TEST_NAME = "transferOperations/transferJobs-123-456"
+TEST_COUNTERS = {
+ "bytesFoundFromSource": 512,
+ "bytesCopiedToSink": 1024,
+}
+
def _without_key(body, key):
obj = deepcopy(body)
@@ -370,8 +377,24 @@ class
TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
)
def test_wait_for_transfer_job(self, mock_list, mock_sleep,
mock_project_id):
mock_list.side_effect = [
- [{METADATA: {STATUS: GcpTransferOperationStatus.IN_PROGRESS}}],
- [{METADATA: {STATUS: GcpTransferOperationStatus.SUCCESS}}],
+ [
+ {
+ NAME: TEST_NAME,
+ METADATA: {
+ STATUS: GcpTransferOperationStatus.IN_PROGRESS,
+ COUNTERS: TEST_COUNTERS,
+ },
+ },
+ ],
+ [
+ {
+ NAME: TEST_NAME,
+ METADATA: {
+ STATUS: GcpTransferOperationStatus.SUCCESS,
+ COUNTERS: TEST_COUNTERS,
+ },
+ },
+ ],
]
job_name = 'transferJobs/test-job'
@@ -400,7 +423,13 @@ class
TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
list_execute_method = list_method.return_value.execute
list_execute_method.return_value = {
OPERATIONS: [
- {NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS:
GcpTransferOperationStatus.FAILED}}
+ {
+ NAME: TEST_TRANSFER_OPERATION_NAME,
+ METADATA: {
+ STATUS: GcpTransferOperationStatus.FAILED,
+ COUNTERS: TEST_COUNTERS,
+ },
+ }
]
}
@@ -427,7 +456,13 @@ class
TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
list_execute_method = list_method.return_value.execute
list_execute_method.return_value = {
OPERATIONS: [
- {NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS:
GcpTransferOperationStatus.FAILED}}
+ {
+ NAME: TEST_TRANSFER_OPERATION_NAME,
+ METADATA: {
+ STATUS: GcpTransferOperationStatus.FAILED,
+ COUNTERS: TEST_COUNTERS,
+ },
+ }
]
}
@@ -498,7 +533,10 @@ class
TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
]
)
def test_operations_contain_expected_statuses_green_path(self, statuses,
expected_statuses):
- operations = [{NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS:
status}} for status in statuses]
+ operations = [
+ {NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS: status,
COUNTERS: TEST_COUNTERS}}
+ for status in statuses
+ ]
result =
CloudDataTransferServiceHook.operations_contain_expected_statuses(
operations, expected_statuses
diff --git
a/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
b/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
index 9d7bb4d..e867469 100644
---
a/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
+++
b/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
@@ -25,13 +25,27 @@ from
airflow.providers.google.cloud.sensors.cloud_storage_transfer_service impor
CloudDataTransferServiceJobStatusSensor,
)
+TEST_NAME = "transferOperations/transferJobs-123-456"
+TEST_COUNTERS = {
+ "bytesFoundFromSource": 512,
+ "bytesCopiedToSink": 1024,
+}
+
class TestGcpStorageTransferOperationWaitForJobStatusSensor(unittest.TestCase):
@mock.patch(
'airflow.providers.google.cloud.sensors.cloud_storage_transfer_service.CloudDataTransferServiceHook'
)
def test_wait_for_status_success(self, mock_tool):
- operations = [{'metadata': {'status':
GcpTransferOperationStatus.SUCCESS}}]
+ operations = [
+ {
+ 'name': TEST_NAME,
+ 'metadata': {
+ 'status': GcpTransferOperationStatus.SUCCESS,
+ 'counters': TEST_COUNTERS,
+ },
+ }
+ ]
mock_tool.return_value.list_transfer_operations.return_value =
operations
mock_tool.operations_contain_expected_statuses.return_value = True
@@ -79,8 +93,24 @@ class
TestGcpStorageTransferOperationWaitForJobStatusSensor(unittest.TestCase):
)
def test_wait_for_status_after_retry(self, mock_tool):
operations_set = [
- [{'metadata': {'status': GcpTransferOperationStatus.SUCCESS}}],
- [{'metadata': {'status': GcpTransferOperationStatus.SUCCESS}}],
+ [
+ {
+ 'name': TEST_NAME,
+ 'metadata': {
+ 'status': GcpTransferOperationStatus.SUCCESS,
+ 'counters': TEST_COUNTERS,
+ },
+ },
+ ],
+ [
+ {
+ 'name': TEST_NAME,
+ 'metadata': {
+ 'status': GcpTransferOperationStatus.SUCCESS,
+ 'counters': TEST_COUNTERS,
+ },
+ },
+ ],
]
mock_tool.return_value.list_transfer_operations.side_effect =
operations_set
@@ -124,7 +154,15 @@ class
TestGcpStorageTransferOperationWaitForJobStatusSensor(unittest.TestCase):
'airflow.providers.google.cloud.sensors.cloud_storage_transfer_service.CloudDataTransferServiceHook'
)
def test_wait_for_status_normalize_status(self, expected_status,
received_status, mock_tool):
- operations = [{'metadata': {'status':
GcpTransferOperationStatus.SUCCESS}}]
+ operations = [
+ {
+ 'name': TEST_NAME,
+ 'metadata': {
+ 'status': GcpTransferOperationStatus.SUCCESS,
+ 'counters': TEST_COUNTERS,
+ },
+ }
+ ]
mock_tool.return_value.list_transfer_operations.return_value =
operations
mock_tool.operations_contain_expected_statuses.side_effect = [False,
True]