This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0e8dcdc4ff Add GlacierUploadArchiveOperator (#26652)
0e8dcdc4ff is described below
commit 0e8dcdc4ffbc33da5bc6864fdb16d2d01bdfdcaf
Author: Pankaj Singh <[email protected]>
AuthorDate: Fri Oct 21 23:22:49 2022 +0530
Add GlacierUploadArchiveOperator (#26652)
* Add GlacierUploadArchiveOperator
---
.../aws/example_dags/example_glacier_to_gcs.py | 13 +++++-
airflow/providers/amazon/aws/operators/glacier.py | 50 ++++++++++++++++++++++
.../operators/glacier.rst | 14 ++++++
docs/spelling_wordlist.txt | 1 +
.../providers/amazon/aws/operators/test_glacier.py | 17 +++++++-
5 files changed, 92 insertions(+), 3 deletions(-)
diff --git
a/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
b/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
index 593688dcbb..58c4d21e1c 100644
--- a/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
+++ b/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
@@ -20,7 +20,10 @@ import os
from datetime import datetime
from airflow import DAG
-from airflow.providers.amazon.aws.operators.glacier import
GlacierCreateJobOperator
+from airflow.providers.amazon.aws.operators.glacier import (
+ GlacierCreateJobOperator,
+ GlacierUploadArchiveOperator,
+)
from airflow.providers.amazon.aws.sensors.glacier import
GlacierJobOperationSensor
from airflow.providers.amazon.aws.transfers.glacier_to_gcs import
GlacierToGCSOperator
@@ -46,6 +49,12 @@ with DAG(
)
# [END howto_sensor_glacier_job_operation]
+ # [START howto_operator_glacier_upload_archive]
+ upload_archive_to_glacier = GlacierUploadArchiveOperator(
+ vault_name=VAULT_NAME, body=b'Test Data',
task_id="upload_data_to_glacier"
+ )
+ # [END howto_operator_glacier_upload_archive]
+
# [START howto_transfer_glacier_to_gcs]
transfer_archive_to_gcs = GlacierToGCSOperator(
task_id="transfer_archive_to_gcs",
@@ -60,4 +69,4 @@ with DAG(
)
# [END howto_transfer_glacier_to_gcs]
- create_glacier_job >> wait_for_operation_complete >>
transfer_archive_to_gcs
+ create_glacier_job >> wait_for_operation_complete >>
upload_archive_to_glacier >> transfer_archive_to_gcs
diff --git a/airflow/providers/amazon/aws/operators/glacier.py
b/airflow/providers/amazon/aws/operators/glacier.py
index 6d96a5de7f..4e7c8b5e17 100644
--- a/airflow/providers/amazon/aws/operators/glacier.py
+++ b/airflow/providers/amazon/aws/operators/glacier.py
@@ -54,3 +54,53 @@ class GlacierCreateJobOperator(BaseOperator):
def execute(self, context: Context):
hook = GlacierHook(aws_conn_id=self.aws_conn_id)
return hook.retrieve_inventory(vault_name=self.vault_name)
+
+
+class GlacierUploadArchiveOperator(BaseOperator):
+ """
+ This operator add an archive to an Amazon S3 Glacier vault
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GlacierUploadArchiveOperator`
+
+ :param vault_name: The name of the vault
+ :param body: A bytes or seekable file-like object. The data to upload.
+ :param checksum: The SHA256 tree hash of the data being uploaded.
+ This parameter is automatically populated if it is not provided
+ :param archive_description: The description of the archive you are
uploading
+ :param account_id: (Optional) AWS account ID of the account that owns the
vault.
+ Defaults to the credentials used to sign the request
+ :param aws_conn_id: The reference to the AWS connection details
+ """
+
+ template_fields: Sequence[str] = ("vault_name",)
+
+ def __init__(
+ self,
+ *,
+ vault_name: str,
+ body: object,
+ checksum: str | None = None,
+ archive_description: str | None = None,
+ account_id: str | None = None,
+ aws_conn_id="aws_default",
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.aws_conn_id = aws_conn_id
+ self.account_id = account_id
+ self.vault_name = vault_name
+ self.body = body
+ self.checksum = checksum
+ self.archive_description = archive_description
+
+ def execute(self, context: Context):
+ hook = GlacierHook(aws_conn_id=self.aws_conn_id)
+ return hook.get_conn().upload_archive(
+ accountId=self.account_id,
+ vaultName=self.vault_name,
+ archiveDescription=self.archive_description,
+ body=self.body,
+ checksum=self.checksum,
+ )
diff --git a/docs/apache-airflow-providers-amazon/operators/glacier.rst
b/docs/apache-airflow-providers-amazon/operators/glacier.rst
index df9b0fcf8b..c1c40525a5 100644
--- a/docs/apache-airflow-providers-amazon/operators/glacier.rst
+++ b/docs/apache-airflow-providers-amazon/operators/glacier.rst
@@ -46,6 +46,20 @@ This Operator returns a dictionary of information related to
the initiated job s
:start-after: [START howto_operator_glacier_create_job]
:end-before: [END howto_operator_glacier_create_job]
+.. _howto/operator:GlacierUploadArchiveOperator:
+
+Upload archive to an Amazon Glacier
+===================================
+
+To add an archive to an Amazon S3 Glacier vault
+use
:class:`~airflow.providers.amazon.aws.transfers.glacier_to_gcs.GlacierUploadArchiveOperator`
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_glacier_upload_archive]
+ :end-before: [END howto_operator_glacier_upload_archive]
+
Sensors
-------
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 181a1079f4..2e67be35b9 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1253,6 +1253,7 @@ securityManager
seealso
Seedlist
seedlist
+seekable
segmentGranularity
Sendgrid
sendgrid
diff --git a/tests/providers/amazon/aws/operators/test_glacier.py
b/tests/providers/amazon/aws/operators/test_glacier.py
index a1b0628301..51600eca0b 100644
--- a/tests/providers/amazon/aws/operators/test_glacier.py
+++ b/tests/providers/amazon/aws/operators/test_glacier.py
@@ -19,7 +19,10 @@ from __future__ import annotations
from unittest import TestCase, mock
-from airflow.providers.amazon.aws.operators.glacier import
GlacierCreateJobOperator
+from airflow.providers.amazon.aws.operators.glacier import (
+ GlacierCreateJobOperator,
+ GlacierUploadArchiveOperator,
+)
AWS_CONN_ID = "aws_default"
BUCKET_NAME = "airflow_bucket"
@@ -38,3 +41,15 @@ class TestGlacierCreateJobOperator(TestCase):
op.execute(mock.MagicMock())
hook_mock.assert_called_once_with(aws_conn_id=AWS_CONN_ID)
hook_mock.return_value.retrieve_inventory.assert_called_once_with(vault_name=VAULT_NAME)
+
+
+class TestGlacierUploadArchiveOperator(TestCase):
+
@mock.patch("airflow.providers.amazon.aws.operators.glacier.GlacierHook.get_conn")
+ def test_execute(self, hook_mock):
+ op = GlacierUploadArchiveOperator(
+ aws_conn_id=AWS_CONN_ID, vault_name=VAULT_NAME, body=b'Test Data',
task_id=TASK_ID
+ )
+ op.execute(mock.MagicMock())
+ hook_mock.return_value.upload_archive.assert_called_once_with(
+ accountId=None, vaultName=VAULT_NAME, archiveDescription=None,
body=b'Test Data', checksum=None
+ )