sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create
Google Cloud Transfer Service Operators
URL: https://github.com/apache/airflow/pull/4792#discussion_r260991871
##########
File path: airflow/contrib/hooks/gcp_transfer_hook.py
##########
@@ -52,56 +76,277 @@ def get_conn(self):
"""
if not self._conn:
http_authorized = self._authorize()
- self._conn = build('storagetransfer', self.api_version,
- http=http_authorized, cache_discovery=False)
+ self._conn = build(
+ 'storagetransfer', self.api_version, http=http_authorized,
cache_discovery=False
+ )
return self._conn
- def create_transfer_job(self, description, schedule, transfer_spec,
project_id=None):
- transfer_job = {
- 'status': 'ENABLED',
- 'projectId': project_id or self.project_id,
- 'description': description,
- 'transferSpec': transfer_spec,
- 'schedule': schedule or self._schedule_once_now(),
- }
- return
self.get_conn().transferJobs().create(body=transfer_job).execute()
-
- def wait_for_transfer_job(self, job):
+ @GoogleCloudBaseHook.catch_http_exception
+ def create_transfer_job(self, body):
+ """
+ Creates a transfer job that runs periodically.
+
+ :param body: (Required) A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+ :type body: dict
+ :return: transfer job.
+ See:
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#TransferJob
+ :rtype: dict
+ """
+ body = self._inject_project_id(body, 'body project_id')
+ return
self.get_conn().transferJobs().create(body=body).execute(num_retries=NUM_RETRIES)
+
+ @GoogleCloudBaseHook.fallback_to_default_project_id
+ @GoogleCloudBaseHook.catch_http_exception
+ def get_transfer_job(self, job_name, project_id=None):
+ """
+ Gets the latest state of a long-running operation in Google Storage
+ Transfer Service.
+
+ :param job_name: (Required) Name of the job to be fetched
+ :type job_name: str
+ :param project_id: (Optional) the ID of the project that owns the
Transfer
+ Job. If set to None or missing, the default project_id from the GCP
+ connection is used.
+ :type project_id: str
+ :return: Transfer Job
+ :rtype: dict
+ """
+ return (
+ self.get_conn()
+ .transferJobs()
+ .get(jobName=job_name, projectId=project_id)
+ .execute(num_retries=NUM_RETRIES)
+ )
+
+ def list_transfer_job(self, filter):
+ """
+ Lists long-running operations in Google Storage Transfer
+ Service that match the specified filter.
+
+ :param filter: (Required) A request filter, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/list#body.QUERY_PARAMETERS.filter
+ :type filter: dict
+ :return: List of Transfer Jobs
+ :rtype: list[dict]
+ """
+ conn = self.get_conn()
+ filter = self._inject_project_id(filter, 'filter project_id')
+ request = conn.transferJobs().list(filter=json.dumps(filter))
+ jobs = []
+
+ while request is not None:
+ response = request.execute(num_retries=NUM_RETRIES)
+ jobs.extend(response['transferJobs'])
+
+ request = conn.transferJobs().list_next(previous_request=request,
previous_response=response)
+
+ return jobs
+
+ @GoogleCloudBaseHook.catch_http_exception
+ def update_transfer_job(self, job_name, body):
+ """
+ Updates a transfer job that runs periodically.
+
+ :param job_name: (Required) Name of the job to be updated
+ :type job_name: str
+ :param body: A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+ :type body: dict
+ :return: If successful, TransferJob.
+ :rtype: dict
+ """
+ body = self._inject_project_id(body, 'body project_id')
+ return (
+ self.get_conn().transferJobs().patch(jobName=job_name,
body=body).execute(num_retries=NUM_RETRIES)
+ )
+
+ @GoogleCloudBaseHook.fallback_to_default_project_id
+ @GoogleCloudBaseHook.catch_http_exception
+ def delete_transfer_job(self, job_name, project_id):
+ """
+ Deletes a transfer job. This is a soft delete. After a transfer job is
+ deleted, the job and all the transfer executions are subject to garbage
+ collection. Transfer jobs become eligible for garbage collection
+ 30 days after soft delete.
+
+ :param job_name: (Required) Name of the job to be deleted
+ :type job_name: str
+ :param project_id: (Optional) the ID of the project that owns the
Transfer
+ Job. If set to None or missing, the default project_id from the GCP
+ connection is used.
+ :type project_id: str
+ :rtype: None
+ """
+
+ return (
+ self.get_conn()
+ .transferJobs()
+ .patch(
+ jobName=job_name,
+ body={
+ 'project_id': project_id,
+ 'transfer_job': {'status': GcpTransferJobsStatus.DELETED},
Review comment:
Will this actually stop the GcpTransfer? Mind pointing to docs that describe
how this works?
My concern is that the true delete operation isn't implemented and it might
be misleading to name this delete_transfer_job. The naming concern also stands
for the operator name.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services