turbaszek commented on a change in pull request #10947: URL: https://github.com/apache/airflow/pull/10947#discussion_r491856772
########## File path: airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py ########## @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os + +from airflow import models +from airflow.providers.amazon.aws.sensors.glacier import GlacierJobOperationSensor +from airflow.providers.amazon.aws.transfers.glacier_to_gcs import ( + GlacierCreateJobOperator, + GlacierDownloadArchive, + GlacierTransferDataToGCS, +) +from airflow.utils.dates import days_ago + +VAULT_NAME = "airflow" +BUCKET_NAME = os.environ.get("GCS_BUCKET_NAME", "gs://glacier_bucket") + +with models.DAG( + "example_glacier_to_gcs", schedule_interval=None, start_date=days_ago(1), # Override to match your needs +) as dag: + # [START howto_glacier_create_job_operator] + create_glacier_job = GlacierCreateJobOperator( + task_id="create_glacier_job", aws_conn_id="aws_default", vault_name=VAULT_NAME, + ) + JOB_ID = '{{ task_instance.xcom_pull("create_glacier_job")["jobId"] }}' Review comment: It may be faster as you read only a single string instead of JSON object. But for me, the main point is that I don't have to know the exact structure of the returned dictionary. I just need to know that someone already extracted the job id from it and saved it in XCom under a key. ########## File path: airflow/providers/amazon/aws/transfers/glacier_to_gcs.py ########## @@ -0,0 +1,162 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import tempfile +from typing import Optional, Union, Sequence + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.glacier import GlacierHook +from airflow.providers.google.cloud.hooks.gcs import GCSHook +from airflow.utils.decorators import apply_defaults + + +class GlacierCreateJobOperator(BaseOperator): + """ + Initiate an Amazon Glacier inventory-retrieval job + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlacierCreateJobOperator` + + + :param vault_name: the Glacier vault on which job is executed + :type vault_name: str + """ + + template_fields = ("vault_name",) + + @apply_defaults + def __init__( + self, *, aws_conn_id="aws_default", vault_name: str, **kwargs, + ): + super().__init__(**kwargs) + self.aws_conn_id = aws_conn_id + self.vault_name = vault_name + + def execute(self, context): + hook = GlacierHook(aws_conn_id=self.aws_conn_id) + response = hook.retrieve_inventory(vault_name=self.vault_name) + return response + + +class GlacierDownloadArchive(BaseOperator): + """ + This operator retrieves results from GlacierCreateJobOperator + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlacierDownloadArchive` + + + :param vault_name: the Glacier vault on which job is executed + :type vault_name: string + :param job_id: the job ID was returned by retrieve_inventory() + :type job_id: str + """ + + template_fields = ("vault_name", "job_id") + + @apply_defaults + def __init__( + self, *, aws_conn_id="aws_default", vault_name: str, job_id: str, **kwargs, + ): + super().__init__(**kwargs) + self.aws_conn_id = aws_conn_id + self.vault_name = vault_name + self.job_id = job_id + + def execute(self, context): + hook = GlacierHook(aws_conn_id="aws_default") + response = hook.retrieve_inventory_results(vault_name=self.vault_name, job_id=self.job_id) + return response + + +class GlacierTransferDataToGCS(BaseOperator): + """ + This operator transfers data from Glacier to Google Cloud Storage + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlacierTransferDataToGCS` + + :param vault_name: the Glacier vault on which job is executed + :type vault_name: string + :param bucket_name: the Google Cloud Storage bucket where the data will be transferred + :type bucket_name: str + :param object_name: the name of the object to check in the Google cloud + storage bucket. + :type object_name: str + :param gzip: option to compress local file or file data for upload + :type gzip: bool + :param google_impersonation_chain: Optional Google service account to impersonate using + short-term credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :type google_impersonation_chain: Union[str, Sequence[str]] + """ + + template_fields = ("vault_name", "bucket_name", "object_name") + + @apply_defaults + def __init__( + self, + *, + aws_conn_id="aws_default", + vault_name: str, + gcp_conn_id="google_cloud_default", + delegate_to=None, + google_impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + bucket_name: str, + object_name: str, + gzip: bool, + **kwargs, + ): + super().__init__(**kwargs) + self.aws_conn_id = aws_conn_id + self.vault_name = vault_name + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.impersonation_chain = google_impersonation_chain + self.bucket_name = bucket_name + self.object_name = object_name + self.gzip = gzip + + def execute(self, context): + glacier_hook = GlacierHook(aws_conn_id=self.aws_conn_id) + gcs_hook = GCSHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + impersonation_chain=self.impersonation_chain, + ) + + # todo: should I check if vault exists? Review comment: Looks good, let's remove this comment ########## File path: tests/providers/amazon/aws/hooks/test_glacier.py ########## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +import mock +import pytest +from botocore.exceptions import ClientError +from testfixtures import LogCapture + +from airflow.providers.amazon.aws.hooks.glacier import GlacierHook + +CREDENTIALS = "aws_conn" +VAULT_NAME = "airflow" +JOB_ID = "1234abcd" +REQUEST_RESULT = {"jobId": "1234abcd"} +RESPONSE_BODY = {"body": "data"} +JOB_STATUS = {"Action": "", "StatusCode": "Succeeded"} + + +class TestAmazonGlacierHook(unittest.TestCase): + def setUp(self): + with mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.__init__", return_value=None): + self.hook = GlacierHook(aws_conn_id="aws_default") + + @mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn") + def test_retrieve_inventory_should_be_called_once_with(self, mock_conn): + # when + self.hook.retrieve_inventory(VAULT_NAME) + # then + mock_conn.assert_called_once_with() + + @mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn") + def test_retrieve_inventory_should_return_job_id(self, mock_conn): + # Given + job_id = {"jobId": "1234abcd"} + # when + mock_conn.return_value.initiate_job.return_value = job_id + result = self.hook.retrieve_inventory(VAULT_NAME) + # then + self.assertEqual(job_id, result) + + @mock.patch( + "airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn", side_effect=[ClientError({}, {})] + ) + def test_retrieve_inventory_should_raise_client_error_when_error_occurred(self, mock_conn): + with pytest.raises(ClientError): + self.hook.retrieve_inventory(VAULT_NAME) + + @mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn") + def test_retrieve_inventory_should_log_mgs(self, mock_conn): + # given + job_id = {"jobId": "1234abcd"} + + # when + with LogCapture() as log: + mock_conn.return_value.initiate_job.return_value = job_id + self.hook.retrieve_inventory(VAULT_NAME) + + # then + log.check( + ( + 'airflow.providers.amazon.aws.hooks.glacier.GlacierHook', + 'INFO', + f"Retrieving inventory for vault: {VAULT_NAME}", + ), + ( + 'airflow.providers.amazon.aws.hooks.glacier.GlacierHook', + 'INFO', + f"Initiated inventory-retrieval job for: {VAULT_NAME}", + ), + ( + 'airflow.providers.amazon.aws.hooks.glacier.GlacierHook', + 'INFO', + f"Retrieval Job ID: {job_id.get('jobId')}", + ), + ) + + @mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn") + def test_retrieve_inventory_results_should_be_called_once_with(self, mock_conn): + # when + self.hook.retrieve_inventory_results(vault_name=VAULT_NAME, job_id=JOB_ID) + # then + mock_conn.assert_called_once_with() + + @mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn") + def test_retrieve_inventory_results_should_return_(self, mock_conn): + # when + mock_conn.return_value.get_job_output.return_value = RESPONSE_BODY + response = self.hook.retrieve_inventory_results(VAULT_NAME, JOB_ID) + # then + self.assertEqual(response, RESPONSE_BODY) Review comment: I would say there no need for two assertions. From logical point of view if `test_retrieve_inventory_results_should_return_` works as expected it also means that the method was called Plus why the test name ends with `_`? 😉 ########## File path: airflow/providers/amazon/aws/transfers/glacier_to_gcs.py ########## @@ -0,0 +1,169 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import tempfile +from typing import Optional, Union, Sequence + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.glacier import GlacierHook +from airflow.providers.google.cloud.hooks.gcs import GCSHook +from airflow.utils.decorators import apply_defaults + + +class GlacierCreateJobOperator(BaseOperator): + """ + Initiate an Amazon Glacier inventory-retrieval job + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlacierCreateJobOperator` + + + :param vault_name: the Glacier vault on which job is executed + :type vault_name: str + """ + + template_fields = ("vault_name",) + + @apply_defaults + def __init__( + self, + *, + aws_conn_id="aws_default", + vault_name: str, + **kwargs, + ): + super().__init__(**kwargs) + self.aws_conn_id = aws_conn_id + self.vault_name = vault_name + + def execute(self, context): + hook = GlacierHook(aws_conn_id=self.aws_conn_id) + response = hook.retrieve_inventory(vault_name=self.vault_name) + return response Review comment: ```suggestion self.xcom_push(context, key="job_id", value=response["StatusCode"]) return response ``` ########## File path: airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py ########## @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os + +from airflow import models +from airflow.providers.amazon.aws.sensors.glacier import GlacierJobOperationSensor +from airflow.providers.amazon.aws.transfers.glacier_to_gcs import ( + GlacierCreateJobOperator, + GlacierDownloadArchive, + GlacierTransferDataToGCS, +) +from airflow.utils.dates import days_ago + +VAULT_NAME = "airflow" +BUCKET_NAME = os.environ.get("GCS_BUCKET_NAME", "gs://glacier_bucket") + +with models.DAG( + "example_glacier_to_gcs", schedule_interval=None, start_date=days_ago(1), # Override to match your needs +) as dag: + # [START howto_glacier_create_job_operator] + create_glacier_job = GlacierCreateJobOperator( + task_id="create_glacier_job", aws_conn_id="aws_default", vault_name=VAULT_NAME, + ) + JOB_ID = '{{ task_instance.xcom_pull("create_glacier_job")["jobId"] }}' Review comment: This will be required: https://github.com/apache/airflow/pull/10947#discussion_r491907736 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
