Lee-W commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1254101538
##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -466,3 +472,53 @@ def operations_contain_expected_statuses(
f"Expected: {', '.join(expected_statuses_set)}"
)
return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+ """Asynchronous hook for Google Storage Transfer Service."""
+
+ def __init__(self, project_id: str | None = None, **kwargs: Any):
Review Comment:
nitpick
```suggestion
def __init__(self, project_id: str | None = None, **kwargs: Any) -> None:
```
##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -466,3 +472,53 @@ def operations_contain_expected_statuses(
f"Expected: {', '.join(expected_statuses_set)}"
)
return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+ """Asynchronous hook for Google Storage Transfer Service."""
+
+ def __init__(self, project_id: str | None = None, **kwargs: Any):
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self._client: storage_transfer_v1.StorageTransferServiceAsyncClient |
None = None
+
+ def get_conn(self):
Review Comment:
nitpick: could we please add a return type annotation here
##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -466,3 +472,53 @@ def operations_contain_expected_statuses(
f"Expected: {', '.join(expected_statuses_set)}"
)
return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+ """Asynchronous hook for Google Storage Transfer Service."""
+
+ def __init__(self, project_id: str | None = None, **kwargs: Any):
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self._client: storage_transfer_v1.StorageTransferServiceAsyncClient |
None = None
+
+ def get_conn(self):
+ """
+ Returns async connection to the Storage Transfer Service.
+
+ :return: Google Storage Transfer asynchronous client.
+ """
+ if not self._client:
+ try:
+ self._client =
storage_transfer_v1.StorageTransferServiceAsyncClient()
+ except GoogleAuthError as ex:
Review Comment:
Do we need to catch the exception here? Should we handle the exception in
where this is called? It might be a bit confusing when I call this function and
get an `AirflowException` instead of connection related one
##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service
import (
+ CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+ """
+ StorageTransferJobTrigger run on the trigger worker to perform Cloud
Storage Transfer job.
+
+ :param job_names: List of transfer jobs names.
+ :param project_id: GCP project id.
+ :param poll_interval: Interval in seconds between polls.
+ """
+
+ def __init__(self, job_names: list[str], project_id: str | None = None,
poll_interval: int = 10):
Review Comment:
nitpick
```suggestion
def __init__(self, job_names: list[str], project_id: str | None = None,
poll_interval: int = 10) -> None:
```
##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -221,3 +246,104 @@ def gcs_to_s3_object(self, gcs_object: str) -> str:
if self.apply_gcs_prefix:
return self.prefix + s3_object
return s3_object
+
+ def transfer_files(self, s3_objects: list[str], gcs_hook: GCSHook,
s3_hook: S3Hook):
Review Comment:
nitpick
```suggestion
def transfer_files(self, s3_objects: list[str], gcs_hook: GCSHook,
s3_hook: S3Hook) -> None:
```
##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -466,3 +472,53 @@ def operations_contain_expected_statuses(
f"Expected: {', '.join(expected_statuses_set)}"
)
return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+ """Asynchronous hook for Google Storage Transfer Service."""
+
+ def __init__(self, project_id: str | None = None, **kwargs: Any):
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self._client: storage_transfer_v1.StorageTransferServiceAsyncClient |
None = None
+
+ def get_conn(self):
+ """
+ Returns async connection to the Storage Transfer Service.
+
+ :return: Google Storage Transfer asynchronous client.
+ """
+ if not self._client:
+ try:
+ self._client =
storage_transfer_v1.StorageTransferServiceAsyncClient()
+ except GoogleAuthError as ex:
+ raise AirflowException(ex)
+ return self._client
+
+ async def get_jobs(self, job_names: list[str]):
+ """
+ Gets the latest state of a long-running operations in Google Storage
Transfer Service.
+
+ :param job_names: (Required) List of names of the jobs to be fetched.
+ :return: Object that yields Transfer jobs.
+ """
+ client = self.get_conn()
+ jobs_list_request = storage_transfer_v1.ListTransferJobsRequest(
+ filter=json.dumps(dict(project_id=self.project_id,
job_names=job_names))
+ )
+ return await client.list_transfer_jobs(request=jobs_list_request)
+
+ async def get_latest_operation(self, job: storage_transfer_v1.TransferJob)
-> Message | None:
+ """
+ Gets the latest operation of the given TransferJob instance.
+
+ :param job: Transfer job instance.
+ :return: The latest job operation.
+ """
+ latest_operation_name = job.latest_operation_name
+ if latest_operation_name:
+ client = self.get_conn()
+ response_operation = await
client.transport.operations_client.get_operation(latest_operation_name)
+ operation =
storage_transfer_v1.TransferOperation.deserialize(response_operation.metadata.value)
+ return operation
+ return None
Review Comment:
I think we don't need it for returning `None`.
```suggestion
```
##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -221,3 +246,104 @@ def gcs_to_s3_object(self, gcs_object: str) -> str:
if self.apply_gcs_prefix:
return self.prefix + s3_object
return s3_object
+
+ def transfer_files(self, s3_objects: list[str], gcs_hook: GCSHook,
s3_hook: S3Hook):
+ if s3_objects:
+ dest_gcs_bucket, dest_gcs_object_prefix =
_parse_gcs_url(self.dest_gcs)
+ for obj in s3_objects:
+ # GCS hook builds its own in-memory file, so we have to create
+ # and pass the path
+ file_object = s3_hook.get_key(obj, self.bucket)
+ with NamedTemporaryFile(mode="wb", delete=True) as file:
+ file_object.download_fileobj(file)
+ file.flush()
+ gcs_file = self.s3_to_gcs_object(s3_object=obj)
+ gcs_hook.upload(dest_gcs_bucket, gcs_file, file.name,
gzip=self.gzip)
+
+ self.log.info("All done, uploaded %d files to Google Cloud
Storage", len(s3_objects))
+
+ def transfer_files_async(self, files: list[str], gcs_hook: GCSHook,
s3_hook: S3Hook):
+ """Submits Google Cloud Storage Transfer Service job to copy files
from AWS S3 to GCS."""
+ if len(files) <= 0:
Review Comment:
Should we just go with an empty check?
```suggestion
if not len(files):
```
##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -466,3 +472,53 @@ def operations_contain_expected_statuses(
f"Expected: {', '.join(expected_statuses_set)}"
)
return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+ """Asynchronous hook for Google Storage Transfer Service."""
+
+ def __init__(self, project_id: str | None = None, **kwargs: Any):
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self._client: storage_transfer_v1.StorageTransferServiceAsyncClient |
None = None
+
+ def get_conn(self):
+ """
+ Returns async connection to the Storage Transfer Service.
+
+ :return: Google Storage Transfer asynchronous client.
+ """
+ if not self._client:
+ try:
+ self._client =
storage_transfer_v1.StorageTransferServiceAsyncClient()
+ except GoogleAuthError as ex:
+ raise AirflowException(ex)
+ return self._client
+
+ async def get_jobs(self, job_names: list[str]):
Review Comment:
nitpick: could we please add a return type annotation here
##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -221,3 +246,104 @@ def gcs_to_s3_object(self, gcs_object: str) -> str:
if self.apply_gcs_prefix:
return self.prefix + s3_object
return s3_object
+
+ def transfer_files(self, s3_objects: list[str], gcs_hook: GCSHook,
s3_hook: S3Hook):
+ if s3_objects:
+ dest_gcs_bucket, dest_gcs_object_prefix =
_parse_gcs_url(self.dest_gcs)
+ for obj in s3_objects:
+ # GCS hook builds its own in-memory file, so we have to create
+ # and pass the path
+ file_object = s3_hook.get_key(obj, self.bucket)
+ with NamedTemporaryFile(mode="wb", delete=True) as file:
+ file_object.download_fileobj(file)
+ file.flush()
+ gcs_file = self.s3_to_gcs_object(s3_object=obj)
+ gcs_hook.upload(dest_gcs_bucket, gcs_file, file.name,
gzip=self.gzip)
+
+ self.log.info("All done, uploaded %d files to Google Cloud
Storage", len(s3_objects))
+
+ def transfer_files_async(self, files: list[str], gcs_hook: GCSHook,
s3_hook: S3Hook):
Review Comment:
nitpick
```suggestion
def transfer_files_async(self, files: list[str], gcs_hook: GCSHook,
s3_hook: S3Hook) -> None:
```
##########
tests/system/providers/google/cloud/gcs/example_s3_to_gcs_async.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.decorators import task
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator,
S3DeleteBucketOperator
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_s3_to_gcs"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+GCS_BUCKET_URL = f"gs://{BUCKET_NAME}/"
+UPLOAD_FILE = "/tmp/example-file.txt"
+PREFIX = "TESTS"
+
+
+@task(task_id="upload_file_to_s3")
+def upload_file():
+ """A callable to upload file to AWS bucket"""
+ s3_hook = S3Hook()
+ s3_hook.load_file(filename=UPLOAD_FILE, key=PREFIX,
bucket_name=BUCKET_NAME)
+
+
+with models.DAG(
+ DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "s3"],
+) as dag:
+ create_s3_bucket = S3CreateBucketOperator(
+ task_id="create_s3_bucket", bucket_name=BUCKET_NAME,
region_name="us-east-1"
+ )
+
+ create_gcs_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=BUCKET_NAME,
+ project_id=GCP_PROJECT_ID,
+ )
+ # [START howto_transfer_s3togcs_operator_async]
Review Comment:
```suggestion
# [START howto_transfer_s3togcs_operator_async]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]