This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cd3fa33e82 DynamoDBToS3Operator - Add a feature to export the table to 
a point in time. (#31142)
cd3fa33e82 is described below

commit cd3fa33e82922e01888d609ed9c24b9c2dadfa27
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Wed May 10 05:26:29 2023 +0530

    DynamoDBToS3Operator - Add a feature to export the table to a point in 
time. (#31142)
    
    Co-authored-by: D. Ferruzzi <[email protected]>
---
 .../amazon/aws/transfers/dynamodb_to_s3.py         | 47 ++++++++++++++++--
 airflow/providers/amazon/aws/waiters/README.md     |  2 +
 airflow/providers/amazon/aws/waiters/dynamodb.json | 30 ++++++++++++
 .../transfer/dynamodb_to_s3.rst                    |  8 +++
 .../amazon/aws/transfers/test_dynamodb_to_s3.py    | 41 ++++++++++++++++
 .../amazon/aws/waiters/test_custom_waiters.py      | 57 ++++++++++++++++++++++
 .../providers/amazon/aws/example_dynamodb_to_s3.py | 12 +++++
 7 files changed, 194 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py 
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index bd2034893e..c8eee0b9f5 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -23,12 +23,14 @@ from __future__ import annotations
 
 import json
 from copy import copy
+from datetime import datetime
 from decimal import Decimal
 from os.path import getsize
 from tempfile import NamedTemporaryFile
 from typing import IO, TYPE_CHECKING, Any, Callable, Sequence
 from uuid import uuid4
 
+from airflow.compat.functools import cached_property
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@@ -87,6 +89,10 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
     :param dynamodb_scan_kwargs: kwargs pass to 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
     :param s3_key_prefix: Prefix of s3 object key
     :param process_func: How we transforms a dynamodb item to bytes. By 
default we dump the json
+    :param export_time: Time in the past from which to export table data, 
counted in seconds from the start of
+     the Unix epoch. The table export will be a snapshot of the table's state 
at this point in time.
+    :param export_format: The format for the exported data. Valid values for 
ExportFormat are DYNAMODB_JSON
+     or ION.
     """
 
     template_fields: Sequence[str] = (
@@ -109,6 +115,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         dynamodb_scan_kwargs: dict[str, Any] | None = None,
         s3_key_prefix: str = "",
         process_func: Callable[[dict[str, Any]], bytes] = 
_convert_item_to_json_bytes,
+        export_time: datetime | None = None,
+        export_format: str = "DYNAMODB_JSON",
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -118,11 +126,44 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
+        self.export_time = export_time
+        self.export_format = export_format
 
-    def execute(self, context: Context) -> None:
-        hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
-        table = hook.get_conn().Table(self.dynamodb_table_name)
+        if self.export_time and self.export_time > datetime.now():
+            raise ValueError("The export_time parameter cannot be a future 
time.")
+
+    @cached_property
+    def hook(self):
+        """Create DynamoDBHook"""
+        return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
 
+    def execute(self, context: Context) -> None:
+        if self.export_time:
+            self._export_table_to_point_in_time()
+        else:
+            self._export_entire_data()
+
+    def _export_table_to_point_in_time(self):
+        """
+        Export data from start of epoc till `export_time`. Table export will 
be a snapshot of the table's
+         state at this point in time.
+        """
+        client = self.hook.conn.meta.client
+        table_description = 
client.describe_table(TableName=self.dynamodb_table_name)
+        response = client.export_table_to_point_in_time(
+            TableArn=table_description.get("Table", {}).get("TableArn"),
+            ExportTime=self.export_time,
+            S3Bucket=self.s3_bucket_name,
+            S3Prefix=self.s3_key_prefix,
+            ExportFormat=self.export_format,
+        )
+        waiter = self.hook.get_waiter("export_table")
+        export_arn = response.get("ExportDescription", {}).get("ExportArn")
+        waiter.wait(ExportArn=export_arn)
+
+    def _export_entire_data(self):
+        """Export all data from the table."""
+        table = self.hook.get_conn().Table(self.dynamodb_table_name)
         scan_kwargs = copy(self.dynamodb_scan_kwargs) if 
self.dynamodb_scan_kwargs else {}
         err = None
         f: IO[Any]
diff --git a/airflow/providers/amazon/aws/waiters/README.md 
b/airflow/providers/amazon/aws/waiters/README.md
index d6e8958b8e..d70b969c20 100644
--- a/airflow/providers/amazon/aws/waiters/README.md
+++ b/airflow/providers/amazon/aws/waiters/README.md
@@ -98,3 +98,5 @@ 
EksHook().get_waiter("all_nodegroups_deleted").wait(clusterName=cluster_name)
 
 Note that since the get_waiter is in the hook instead of on the client side, a 
custom waiter is
 just `hook.get_waiter` and not `hook.conn.get_waiter`.  Other than that, they 
should be identical.
+
+Note the custom waiter doesn't work with resource_type, only client_type is 
supported.
diff --git a/airflow/providers/amazon/aws/waiters/dynamodb.json 
b/airflow/providers/amazon/aws/waiters/dynamodb.json
new file mode 100644
index 0000000000..acd23268ab
--- /dev/null
+++ b/airflow/providers/amazon/aws/waiters/dynamodb.json
@@ -0,0 +1,30 @@
+{
+    "version": 2,
+    "waiters": {
+        "export_table": {
+            "operation": "DescribeExport",
+            "delay": 30,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "matcher": "path",
+                    "expected": "COMPLETED",
+                    "argument": "ExportDescription.ExportStatus",
+                    "state": "success"
+                },
+                {
+                    "matcher": "path",
+                    "expected": "FAILED",
+                    "argument": "ExportDescription.ExportStatus",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "expected": "IN_PROGRESS",
+                    "argument": "ExportDescription.ExportStatus",
+                    "state": "retry"
+                }
+            ]
+        }
+    }
+}
diff --git a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst 
b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
index 508f3d4fbc..764e63f96d 100644
--- a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
+++ b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
@@ -63,6 +63,14 @@ To parallelize the replication, users can create multiple 
``DynamoDBToS3Operator
     :start-after: [START howto_transfer_dynamodb_to_s3_segmented]
     :end-before: [END howto_transfer_dynamodb_to_s3_segmented]
 
+Users can also pass in ``export_time`` param to ``DynamoDBToS3Operator`` to 
recover data from a point in time.
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+    :end-before: [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+
 Reference
 ---------
 
diff --git a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py 
b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
index e5dd9f607f..5bb02448f2 100644
--- a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -316,3 +316,44 @@ class TestDynamodbToS3:
         assert "2020-01-01" == getattr(operator, "s3_bucket_name")
         assert "2020-01-01" == getattr(operator, "dynamodb_table_name")
         assert "2020-01-01" == getattr(operator, "s3_key_prefix")
+
+    
@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator._export_entire_data")
+    def test_dynamodb_execute_calling_export_entire_data(self, 
_export_entire_data):
+        """Test that DynamoDBToS3Operator when called without export_time will 
call _export_entire_data"""
+        dynamodb_to_s3_operator = DynamoDBToS3Operator(
+            task_id="dynamodb_to_s3",
+            dynamodb_table_name="airflow_rocks",
+            s3_bucket_name="airflow-bucket",
+            file_size=4000,
+        )
+        dynamodb_to_s3_operator.execute(context={})
+        _export_entire_data.assert_called()
+
+    @patch(
+        
"airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator."
+        "_export_table_to_point_in_time"
+    )
+    def test_dynamodb_execute_calling_export_table_to_point_in_time(self, 
_export_table_to_point_in_time):
+        """Test that DynamoDBToS3Operator when called without export_time will 
call
+        _export_table_to_point_in_time. Which implements point in time 
recovery logic"""
+        dynamodb_to_s3_operator = DynamoDBToS3Operator(
+            task_id="dynamodb_to_s3",
+            dynamodb_table_name="airflow_rocks",
+            s3_bucket_name="airflow-bucket",
+            file_size=4000,
+            export_time=datetime(year=1983, month=1, day=1),
+        )
+        dynamodb_to_s3_operator.execute(context={})
+        _export_table_to_point_in_time.assert_called()
+
+    def test_dynamodb_with_future_date(self):
+        """Test that DynamoDBToS3Operator should raise a exception when future 
date is passed in
+        export_time parameter"""
+        with pytest.raises(ValueError, match="The export_time parameter cannot 
be a future time."):
+            DynamoDBToS3Operator(
+                task_id="dynamodb_to_s3",
+                dynamodb_table_name="airflow_rocks",
+                s3_bucket_name="airflow-bucket",
+                file_size=4000,
+                export_time=datetime(year=3000, month=1, day=1),
+            )
diff --git a/tests/providers/amazon/aws/waiters/test_custom_waiters.py 
b/tests/providers/amazon/aws/waiters/test_custom_waiters.py
index 5531b6a801..227ca6be42 100644
--- a/tests/providers/amazon/aws/waiters/test_custom_waiters.py
+++ b/tests/providers/amazon/aws/waiters/test_custom_waiters.py
@@ -26,6 +26,7 @@ from botocore.exceptions import WaiterError
 from botocore.waiter import WaiterModel
 from moto import mock_eks
 
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
 from airflow.providers.amazon.aws.hooks.ecs import EcsClusterStates, EcsHook, 
EcsTaskDefinitionStates
 from airflow.providers.amazon.aws.hooks.eks import EksHook
 from airflow.providers.amazon.aws.waiters.base_waiter import BaseBotoWaiter
@@ -249,3 +250,59 @@ class TestCustomECSServiceWaiters:
         ]
         waiter = 
EcsHook(aws_conn_id=None).get_waiter("task_definition_inactive")
         waiter.wait(taskDefinition="spam-egg", WaiterConfig={"Delay": 0.01, 
"MaxAttempts": 3})
+
+
+class TestCustomDynamoDBServiceWaiters:
+    """Test waiters from ``amazon/aws/waiters/dynamodb.json``."""
+
+    STATUS_COMPLETED = "COMPLETED"
+    STATUS_FAILED = "FAILED"
+    STATUS_IN_PROGRESS = "IN_PROGRESS"
+
+    @pytest.fixture(autouse=True)
+    def setup_test_cases(self, monkeypatch):
+        self.client = boto3.client("dynamodb", region_name="eu-west-3")
+        monkeypatch.setattr(DynamoDBHook, "conn", self.client)
+
+    @pytest.fixture
+    def mock_describe_export(self):
+        """Mock ``DynamoDBHook.Client.describe_export`` method."""
+        with mock.patch.object(self.client, "describe_export") as m:
+            yield m
+
+    def test_service_waiters(self):
+        hook_waiters = DynamoDBHook(aws_conn_id=None).list_waiters()
+        assert "export_table" in hook_waiters
+
+    @staticmethod
+    def describe_export(status: str):
+        """
+        Helper function for generate minimal DescribeExport response for 
single job.
+        
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DescribeExport.html
+        """
+        return {"ExportDescription": {"ExportStatus": status}}
+
+    def test_export_table_to_point_in_time_completed(self, 
mock_describe_export):
+        """Test state transition from `in progress` to `completed` during 
init."""
+        with mock.patch("boto3.client") as client:
+            client.return_value = self.client
+            waiter = DynamoDBHook(aws_conn_id=None).get_waiter("export_table", 
client=self.client)
+            mock_describe_export.side_effect = [
+                self.describe_export(self.STATUS_IN_PROGRESS),
+                self.describe_export(self.STATUS_COMPLETED),
+            ]
+            waiter.wait(
+                
ExportArn="LoremIpsumissimplydummytextoftheprintingandtypesettingindustry",
+            )
+
+    def test_export_table_to_point_in_time_failed(self, mock_describe_export):
+        """Test state transition from `in progress` to `failed` during init."""
+        with mock.patch("boto3.client") as client:
+            client.return_value = self.client
+            mock_describe_export.side_effect = [
+                self.describe_export(self.STATUS_IN_PROGRESS),
+                self.describe_export(self.STATUS_FAILED),
+            ]
+            waiter = DynamoDBHook(aws_conn_id=None).get_waiter("export_table", 
client=self.client)
+            with pytest.raises(WaiterError, match='we matched expected path: 
"FAILED"'):
+                
waiter.wait(ExportArn="LoremIpsumissimplydummytextoftheprintingandtypesettingindustry")
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py 
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index b56efaf2ce..bb2908b37d 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -127,6 +127,17 @@ with DAG(
         },
     )
     # [END howto_transfer_dynamodb_to_s3_segmented]
+
+    # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+    backup_db_to_point_in_time = DynamoDBToS3Operator(
+        task_id="backup_db_to_point_in_time",
+        dynamodb_table_name=table_name,
+        file_size=1000,
+        s3_bucket_name=bucket_name,
+        export_time=datetime(year=2023, month=4, day=10),
+    )
+    # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+
     delete_table = delete_dynamodb_table(table_name=table_name)
 
     delete_bucket = S3DeleteBucketOperator(
@@ -146,6 +157,7 @@ with DAG(
         backup_db,
         backup_db_segment_1,
         backup_db_segment_2,
+        backup_db_to_point_in_time,
         # TEST TEARDOWN
         delete_table,
         delete_bucket,

Reply via email to