This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e8dcdc4ff Add GlacierUploadArchiveOperator (#26652)
0e8dcdc4ff is described below

commit 0e8dcdc4ffbc33da5bc6864fdb16d2d01bdfdcaf
Author: Pankaj Singh <[email protected]>
AuthorDate: Fri Oct 21 23:22:49 2022 +0530

    Add GlacierUploadArchiveOperator (#26652)
    
    * Add GlacierUploadArchiveOperator
---
 .../aws/example_dags/example_glacier_to_gcs.py     | 13 +++++-
 airflow/providers/amazon/aws/operators/glacier.py  | 50 ++++++++++++++++++++++
 .../operators/glacier.rst                          | 14 ++++++
 docs/spelling_wordlist.txt                         |  1 +
 .../providers/amazon/aws/operators/test_glacier.py | 17 +++++++-
 5 files changed, 92 insertions(+), 3 deletions(-)

diff --git 
a/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py 
b/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
index 593688dcbb..58c4d21e1c 100644
--- a/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
+++ b/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
@@ -20,7 +20,10 @@ import os
 from datetime import datetime
 
 from airflow import DAG
-from airflow.providers.amazon.aws.operators.glacier import 
GlacierCreateJobOperator
+from airflow.providers.amazon.aws.operators.glacier import (
+    GlacierCreateJobOperator,
+    GlacierUploadArchiveOperator,
+)
 from airflow.providers.amazon.aws.sensors.glacier import 
GlacierJobOperationSensor
 from airflow.providers.amazon.aws.transfers.glacier_to_gcs import 
GlacierToGCSOperator
 
@@ -46,6 +49,12 @@ with DAG(
     )
     # [END howto_sensor_glacier_job_operation]
 
+    # [START howto_operator_glacier_upload_archive]
+    upload_archive_to_glacier = GlacierUploadArchiveOperator(
+        vault_name=VAULT_NAME, body=b'Test Data', 
task_id="upload_data_to_glacier"
+    )
+    # [END howto_operator_glacier_upload_archive]
+
     # [START howto_transfer_glacier_to_gcs]
     transfer_archive_to_gcs = GlacierToGCSOperator(
         task_id="transfer_archive_to_gcs",
@@ -60,4 +69,4 @@ with DAG(
     )
     # [END howto_transfer_glacier_to_gcs]
 
-    create_glacier_job >> wait_for_operation_complete >> 
transfer_archive_to_gcs
+    create_glacier_job >> wait_for_operation_complete >> 
upload_archive_to_glacier >> transfer_archive_to_gcs
diff --git a/airflow/providers/amazon/aws/operators/glacier.py 
b/airflow/providers/amazon/aws/operators/glacier.py
index 6d96a5de7f..4e7c8b5e17 100644
--- a/airflow/providers/amazon/aws/operators/glacier.py
+++ b/airflow/providers/amazon/aws/operators/glacier.py
@@ -54,3 +54,53 @@ class GlacierCreateJobOperator(BaseOperator):
     def execute(self, context: Context):
         hook = GlacierHook(aws_conn_id=self.aws_conn_id)
         return hook.retrieve_inventory(vault_name=self.vault_name)
+
+
+class GlacierUploadArchiveOperator(BaseOperator):
+    """
+    This operator add an archive to an Amazon S3 Glacier vault
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GlacierUploadArchiveOperator`
+
+    :param vault_name: The name of the vault
+    :param body: A bytes or seekable file-like object. The data to upload.
+    :param checksum: The SHA256 tree hash of the data being uploaded.
+        This parameter is automatically populated if it is not provided
+    :param archive_description: The description of the archive you are 
uploading
+    :param account_id: (Optional) AWS account ID of the account that owns the 
vault.
+        Defaults to the credentials used to sign the request
+    :param aws_conn_id: The reference to the AWS connection details
+    """
+
+    template_fields: Sequence[str] = ("vault_name",)
+
+    def __init__(
+        self,
+        *,
+        vault_name: str,
+        body: object,
+        checksum: str | None = None,
+        archive_description: str | None = None,
+        account_id: str | None = None,
+        aws_conn_id="aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.aws_conn_id = aws_conn_id
+        self.account_id = account_id
+        self.vault_name = vault_name
+        self.body = body
+        self.checksum = checksum
+        self.archive_description = archive_description
+
+    def execute(self, context: Context):
+        hook = GlacierHook(aws_conn_id=self.aws_conn_id)
+        return hook.get_conn().upload_archive(
+            accountId=self.account_id,
+            vaultName=self.vault_name,
+            archiveDescription=self.archive_description,
+            body=self.body,
+            checksum=self.checksum,
+        )
diff --git a/docs/apache-airflow-providers-amazon/operators/glacier.rst 
b/docs/apache-airflow-providers-amazon/operators/glacier.rst
index df9b0fcf8b..c1c40525a5 100644
--- a/docs/apache-airflow-providers-amazon/operators/glacier.rst
+++ b/docs/apache-airflow-providers-amazon/operators/glacier.rst
@@ -46,6 +46,20 @@ This Operator returns a dictionary of information related to 
the initiated job s
     :start-after: [START howto_operator_glacier_create_job]
     :end-before: [END howto_operator_glacier_create_job]
 
+.. _howto/operator:GlacierUploadArchiveOperator:
+
+Upload archive to an Amazon Glacier
+===================================
+
+To add an archive to an Amazon S3 Glacier vault
+use 
:class:`~airflow.providers.amazon.aws.transfers.glacier_to_gcs.GlacierUploadArchiveOperator`
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_glacier_upload_archive]
+    :end-before: [END howto_operator_glacier_upload_archive]
+
 Sensors
 -------
 
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 181a1079f4..2e67be35b9 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1253,6 +1253,7 @@ securityManager
 seealso
 Seedlist
 seedlist
+seekable
 segmentGranularity
 Sendgrid
 sendgrid
diff --git a/tests/providers/amazon/aws/operators/test_glacier.py 
b/tests/providers/amazon/aws/operators/test_glacier.py
index a1b0628301..51600eca0b 100644
--- a/tests/providers/amazon/aws/operators/test_glacier.py
+++ b/tests/providers/amazon/aws/operators/test_glacier.py
@@ -19,7 +19,10 @@ from __future__ import annotations
 
 from unittest import TestCase, mock
 
-from airflow.providers.amazon.aws.operators.glacier import 
GlacierCreateJobOperator
+from airflow.providers.amazon.aws.operators.glacier import (
+    GlacierCreateJobOperator,
+    GlacierUploadArchiveOperator,
+)
 
 AWS_CONN_ID = "aws_default"
 BUCKET_NAME = "airflow_bucket"
@@ -38,3 +41,15 @@ class TestGlacierCreateJobOperator(TestCase):
         op.execute(mock.MagicMock())
         hook_mock.assert_called_once_with(aws_conn_id=AWS_CONN_ID)
         
hook_mock.return_value.retrieve_inventory.assert_called_once_with(vault_name=VAULT_NAME)
+
+
+class TestGlacierUploadArchiveOperator(TestCase):
+    
@mock.patch("airflow.providers.amazon.aws.operators.glacier.GlacierHook.get_conn")
+    def test_execute(self, hook_mock):
+        op = GlacierUploadArchiveOperator(
+            aws_conn_id=AWS_CONN_ID, vault_name=VAULT_NAME, body=b'Test Data', 
task_id=TASK_ID
+        )
+        op.execute(mock.MagicMock())
+        hook_mock.return_value.upload_archive.assert_called_once_with(
+            accountId=None, vaultName=VAULT_NAME, archiveDescription=None, 
body=b'Test Data', checksum=None
+        )

Reply via email to