sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260997416
 
 

 ##########
 File path: airflow/contrib/hooks/gcp_transfer_hook.py
 ##########
 @@ -52,56 +76,277 @@ def get_conn(self):
         """
         if not self._conn:
             http_authorized = self._authorize()
-            self._conn = build('storagetransfer', self.api_version,
-                               http=http_authorized, cache_discovery=False)
+            self._conn = build(
+                'storagetransfer', self.api_version, http=http_authorized, 
cache_discovery=False
+            )
         return self._conn
 
-    def create_transfer_job(self, description, schedule, transfer_spec, 
project_id=None):
-        transfer_job = {
-            'status': 'ENABLED',
-            'projectId': project_id or self.project_id,
-            'description': description,
-            'transferSpec': transfer_spec,
-            'schedule': schedule or self._schedule_once_now(),
-        }
-        return 
self.get_conn().transferJobs().create(body=transfer_job).execute()
-
-    def wait_for_transfer_job(self, job):
+    @GoogleCloudBaseHook.catch_http_exception
+    def create_transfer_job(self, body):
+        """
+        Creates a transfer job that runs periodically.
+
+        :param body: (Required) A request body, as described in
+            
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+        :type body: dict
+        :return: transfer job.
+            See:
+            
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#TransferJob
+        :rtype: dict
+        """
+        body = self._inject_project_id(body, 'body project_id')
+        return 
self.get_conn().transferJobs().create(body=body).execute(num_retries=NUM_RETRIES)
+
+    @GoogleCloudBaseHook.fallback_to_default_project_id
+    @GoogleCloudBaseHook.catch_http_exception
+    def get_transfer_job(self, job_name, project_id=None):
+        """
+        Gets the latest state of a long-running operation in Google Storage
+        Transfer Service.
+
+        :param job_name: (Required) Name of the job to be fetched
+        :type job_name: str
+        :param project_id: (Optional) the ID of the project that owns the 
Transfer
+            Job. If set to None or missing, the default project_id from the GCP
+            connection is used.
+        :type project_id: str
+        :return: Transfer Job
+        :rtype: dict
+        """
+        return (
+            self.get_conn()
+            .transferJobs()
+            .get(jobName=job_name, projectId=project_id)
+            .execute(num_retries=NUM_RETRIES)
+        )
+
+    def list_transfer_job(self, filter):
+        """
+        Lists long-running operations in Google Storage Transfer
+        Service that match the specified filter.
+
+        :param filter: (Required) A request filter, as described in
+            
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/list#body.QUERY_PARAMETERS.filter
+        :type filter: dict
+        :return: List of Transfer Jobs
+        :rtype: list[dict]
+        """
+        conn = self.get_conn()
+        filter = self._inject_project_id(filter, 'filter project_id')
+        request = conn.transferJobs().list(filter=json.dumps(filter))
+        jobs = []
+
+        while request is not None:
+            response = request.execute(num_retries=NUM_RETRIES)
+            jobs.extend(response['transferJobs'])
+
+            request = conn.transferJobs().list_next(previous_request=request, 
previous_response=response)
+
+        return jobs
+
+    @GoogleCloudBaseHook.catch_http_exception
+    def update_transfer_job(self, job_name, body):
+        """
+        Updates a transfer job that runs periodically.
+
+        :param job_name: (Required) Name of the job to be updated
+        :type job_name: str
+        :param body: A request body, as described in
+            
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+        :type body: dict
+        :return: If successful, TransferJob.
+        :rtype: dict
+        """
+        body = self._inject_project_id(body, 'body project_id')
+        return (
+            self.get_conn().transferJobs().patch(jobName=job_name, 
body=body).execute(num_retries=NUM_RETRIES)
+        )
+
+    @GoogleCloudBaseHook.fallback_to_default_project_id
+    @GoogleCloudBaseHook.catch_http_exception
+    def delete_transfer_job(self, job_name, project_id):
+        """
+        Deletes a transfer job. This is a soft delete. After a transfer job is
+        deleted, the job and all the transfer executions are subject to garbage
+        collection. Transfer jobs become eligible for garbage collection
+        30 days after soft delete.
+
+        :param job_name: (Required) Name of the job to be deleted
+        :type job_name: str
+        :param project_id: (Optional) the ID of the project that owns the 
Transfer
+            Job. If set to None or missing, the default project_id from the GCP
+            connection is used.
+        :type project_id: str
+        :rtype: None
+        """
+
+        return (
+            self.get_conn()
+            .transferJobs()
+            .patch(
+                jobName=job_name,
+                body={
+                    'project_id': project_id,
+                    'transfer_job': {'status': GcpTransferJobsStatus.DELETED},
+                    'update_transfer_job_field_mask': 'status',
+                },
+            )
+            .execute(num_retries=NUM_RETRIES)
+        )
+
+    @GoogleCloudBaseHook.catch_http_exception
+    def cancel_transfer_operation(self, operation_name):
+        """
+        Cancels an transfer operation in Google Storage Transfer Service.
+
+        :param operation_name: Name of the transfer operation.
+        :type operation_name: str
+        :rtype: None
+        """
+        
self.get_conn().transferOperations().cancel(name=operation_name).execute(num_retries=NUM_RETRIES)
+
+    @GoogleCloudBaseHook.catch_http_exception
+    def get_transfer_operation(self, operation_name):
+        """
+        Gets an transfer operation in Google Storage Transfer Service.
+
+        :param operation_name: (Required) Name of the transfer operation.
+        :type operation_name: str
+        :return: transfer operation
+            See:
+            
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/Operation
+        :rtype: dict
+        """
+        return 
self.get_conn().transferOperations().get(name=operation_name).execute(num_retries=NUM_RETRIES)
+
+    @GoogleCloudBaseHook.catch_http_exception
+    def list_transfer_operations(self, filter):
+        """
+        Gets an transfer operation in Google Storage Transfer Service.
+
+        :param filter: (Required) A request filter, as described in
+            
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/list#body.QUERY_PARAMETERS.filter
+            With one additional improvement:
+
+            * project_id is optional if you have a project id defined
+              in the connection
+              See: :ref:`connection-type-GCP`
+
+        :type filter: dict
+        :return: transfer operation
+        :rtype: list[dict]
+        """
+        conn = self.get_conn()
+
+        filter = self._inject_project_id(filter, 'filter project_id')
+
+        operations = []
+
+        request = conn.transferOperations().list(name=TRANSFER_OPERATIONS, 
filter=json.dumps(filter))
+
+        while request is not None:
+            response = request.execute(num_retries=NUM_RETRIES)
+            if 'operations' in response:
+                operations.extend(response['operations'])
+
+            request = conn.transferOperations().list_next(
+                previous_request=request, previous_response=response
+            )
+
+        return operations
+
+    @GoogleCloudBaseHook.catch_http_exception
+    def pause_transfer_operation(self, operation_name):
+        """
+        Pauses an transfer operation in Google Storage Transfer Service.
+
+        :param operation_name: (Required) Name of the transfer operation.
+        :type operation_name: str
+        :rtype: None
+        """
+        
self.get_conn().transferOperations().pause(name=operation_name).execute(num_retries=NUM_RETRIES)
+
+    @GoogleCloudBaseHook.catch_http_exception
+    def resume_transfer_operation(self, operation_name):
+        """
+        Resumes an transfer operation in Google Storage Transfer Service.
+
+        :param operation_name: (Required) Name of the transfer operation.
+        :type operation_name: str
+        :rtype: None
+        """
+        
self.get_conn().transferOperations().resume(name=operation_name).execute(num_retries=NUM_RETRIES)
+
+    @GoogleCloudBaseHook.catch_http_exception
+    def wait_for_transfer_job(self, job, 
expected_statuses=(GcpTransferOperationStatus.SUCCESS,)):
 
 Review comment:
   Should there be a timeout option?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to