This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cd3fa33e82 DynamoDBToS3Operator - Add a feature to export the table to
a point in time. (#31142)
cd3fa33e82 is described below
commit cd3fa33e82922e01888d609ed9c24b9c2dadfa27
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Wed May 10 05:26:29 2023 +0530
DynamoDBToS3Operator - Add a feature to export the table to a point in
time. (#31142)
Co-authored-by: D. Ferruzzi <[email protected]>
---
.../amazon/aws/transfers/dynamodb_to_s3.py | 47 ++++++++++++++++--
airflow/providers/amazon/aws/waiters/README.md | 2 +
airflow/providers/amazon/aws/waiters/dynamodb.json | 30 ++++++++++++
.../transfer/dynamodb_to_s3.rst | 8 +++
.../amazon/aws/transfers/test_dynamodb_to_s3.py | 41 ++++++++++++++++
.../amazon/aws/waiters/test_custom_waiters.py | 57 ++++++++++++++++++++++
.../providers/amazon/aws/example_dynamodb_to_s3.py | 12 +++++
7 files changed, 194 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index bd2034893e..c8eee0b9f5 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -23,12 +23,14 @@ from __future__ import annotations
import json
from copy import copy
+from datetime import datetime
from decimal import Decimal
from os.path import getsize
from tempfile import NamedTemporaryFile
from typing import IO, TYPE_CHECKING, Any, Callable, Sequence
from uuid import uuid4
+from airflow.compat.functools import cached_property
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@@ -87,6 +89,10 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
:param dynamodb_scan_kwargs: kwargs pass to
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
:param s3_key_prefix: Prefix of s3 object key
:param process_func: How we transforms a dynamodb item to bytes. By
default we dump the json
+ :param export_time: Time in the past from which to export table data,
counted in seconds from the start of
+ the Unix epoch. The table export will be a snapshot of the table's state
at this point in time.
+ :param export_format: The format for the exported data. Valid values for
ExportFormat are DYNAMODB_JSON
+ or ION.
"""
template_fields: Sequence[str] = (
@@ -109,6 +115,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
dynamodb_scan_kwargs: dict[str, Any] | None = None,
s3_key_prefix: str = "",
process_func: Callable[[dict[str, Any]], bytes] =
_convert_item_to_json_bytes,
+ export_time: datetime | None = None,
+ export_format: str = "DYNAMODB_JSON",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -118,11 +126,44 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
self.s3_bucket_name = s3_bucket_name
self.s3_key_prefix = s3_key_prefix
+ self.export_time = export_time
+ self.export_format = export_format
- def execute(self, context: Context) -> None:
- hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
- table = hook.get_conn().Table(self.dynamodb_table_name)
+ if self.export_time and self.export_time > datetime.now():
+ raise ValueError("The export_time parameter cannot be a future
time.")
+
+ @cached_property
+ def hook(self):
+ """Create DynamoDBHook"""
+ return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
+ def execute(self, context: Context) -> None:
+ if self.export_time:
+ self._export_table_to_point_in_time()
+ else:
+ self._export_entire_data()
+
+ def _export_table_to_point_in_time(self):
+ """
+ Export data from start of epoc till `export_time`. Table export will
be a snapshot of the table's
+ state at this point in time.
+ """
+ client = self.hook.conn.meta.client
+ table_description =
client.describe_table(TableName=self.dynamodb_table_name)
+ response = client.export_table_to_point_in_time(
+ TableArn=table_description.get("Table", {}).get("TableArn"),
+ ExportTime=self.export_time,
+ S3Bucket=self.s3_bucket_name,
+ S3Prefix=self.s3_key_prefix,
+ ExportFormat=self.export_format,
+ )
+ waiter = self.hook.get_waiter("export_table")
+ export_arn = response.get("ExportDescription", {}).get("ExportArn")
+ waiter.wait(ExportArn=export_arn)
+
+ def _export_entire_data(self):
+ """Export all data from the table."""
+ table = self.hook.get_conn().Table(self.dynamodb_table_name)
scan_kwargs = copy(self.dynamodb_scan_kwargs) if
self.dynamodb_scan_kwargs else {}
err = None
f: IO[Any]
diff --git a/airflow/providers/amazon/aws/waiters/README.md
b/airflow/providers/amazon/aws/waiters/README.md
index d6e8958b8e..d70b969c20 100644
--- a/airflow/providers/amazon/aws/waiters/README.md
+++ b/airflow/providers/amazon/aws/waiters/README.md
@@ -98,3 +98,5 @@
EksHook().get_waiter("all_nodegroups_deleted").wait(clusterName=cluster_name)
Note that since the get_waiter is in the hook instead of on the client side, a
custom waiter is
just `hook.get_waiter` and not `hook.conn.get_waiter`. Other than that, they
should be identical.
+
+Note the custom waiter doesn't work with resource_type, only client_type is
supported.
diff --git a/airflow/providers/amazon/aws/waiters/dynamodb.json
b/airflow/providers/amazon/aws/waiters/dynamodb.json
new file mode 100644
index 0000000000..acd23268ab
--- /dev/null
+++ b/airflow/providers/amazon/aws/waiters/dynamodb.json
@@ -0,0 +1,30 @@
+{
+ "version": 2,
+ "waiters": {
+ "export_table": {
+ "operation": "DescribeExport",
+ "delay": 30,
+ "maxAttempts": 60,
+ "acceptors": [
+ {
+ "matcher": "path",
+ "expected": "COMPLETED",
+ "argument": "ExportDescription.ExportStatus",
+ "state": "success"
+ },
+ {
+ "matcher": "path",
+ "expected": "FAILED",
+ "argument": "ExportDescription.ExportStatus",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "expected": "IN_PROGRESS",
+ "argument": "ExportDescription.ExportStatus",
+ "state": "retry"
+ }
+ ]
+ }
+ }
+}
diff --git a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
index 508f3d4fbc..764e63f96d 100644
--- a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
+++ b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
@@ -63,6 +63,14 @@ To parallelize the replication, users can create multiple
``DynamoDBToS3Operator
:start-after: [START howto_transfer_dynamodb_to_s3_segmented]
:end-before: [END howto_transfer_dynamodb_to_s3_segmented]
+Users can also pass in ``export_time`` param to ``DynamoDBToS3Operator`` to
recover data from a point in time.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+ :end-before: [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+
Reference
---------
diff --git a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
index e5dd9f607f..5bb02448f2 100644
--- a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -316,3 +316,44 @@ class TestDynamodbToS3:
assert "2020-01-01" == getattr(operator, "s3_bucket_name")
assert "2020-01-01" == getattr(operator, "dynamodb_table_name")
assert "2020-01-01" == getattr(operator, "s3_key_prefix")
+
+
@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator._export_entire_data")
+ def test_dynamodb_execute_calling_export_entire_data(self,
_export_entire_data):
+ """Test that DynamoDBToS3Operator when called without export_time will
call _export_entire_data"""
+ dynamodb_to_s3_operator = DynamoDBToS3Operator(
+ task_id="dynamodb_to_s3",
+ dynamodb_table_name="airflow_rocks",
+ s3_bucket_name="airflow-bucket",
+ file_size=4000,
+ )
+ dynamodb_to_s3_operator.execute(context={})
+ _export_entire_data.assert_called()
+
+ @patch(
+
"airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator."
+ "_export_table_to_point_in_time"
+ )
+ def test_dynamodb_execute_calling_export_table_to_point_in_time(self,
_export_table_to_point_in_time):
+ """Test that DynamoDBToS3Operator when called without export_time will
call
+ _export_table_to_point_in_time. Which implements point in time
recovery logic"""
+ dynamodb_to_s3_operator = DynamoDBToS3Operator(
+ task_id="dynamodb_to_s3",
+ dynamodb_table_name="airflow_rocks",
+ s3_bucket_name="airflow-bucket",
+ file_size=4000,
+ export_time=datetime(year=1983, month=1, day=1),
+ )
+ dynamodb_to_s3_operator.execute(context={})
+ _export_table_to_point_in_time.assert_called()
+
+ def test_dynamodb_with_future_date(self):
+ """Test that DynamoDBToS3Operator should raise a exception when future
date is passed in
+ export_time parameter"""
+ with pytest.raises(ValueError, match="The export_time parameter cannot
be a future time."):
+ DynamoDBToS3Operator(
+ task_id="dynamodb_to_s3",
+ dynamodb_table_name="airflow_rocks",
+ s3_bucket_name="airflow-bucket",
+ file_size=4000,
+ export_time=datetime(year=3000, month=1, day=1),
+ )
diff --git a/tests/providers/amazon/aws/waiters/test_custom_waiters.py
b/tests/providers/amazon/aws/waiters/test_custom_waiters.py
index 5531b6a801..227ca6be42 100644
--- a/tests/providers/amazon/aws/waiters/test_custom_waiters.py
+++ b/tests/providers/amazon/aws/waiters/test_custom_waiters.py
@@ -26,6 +26,7 @@ from botocore.exceptions import WaiterError
from botocore.waiter import WaiterModel
from moto import mock_eks
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.hooks.ecs import EcsClusterStates, EcsHook,
EcsTaskDefinitionStates
from airflow.providers.amazon.aws.hooks.eks import EksHook
from airflow.providers.amazon.aws.waiters.base_waiter import BaseBotoWaiter
@@ -249,3 +250,59 @@ class TestCustomECSServiceWaiters:
]
waiter =
EcsHook(aws_conn_id=None).get_waiter("task_definition_inactive")
waiter.wait(taskDefinition="spam-egg", WaiterConfig={"Delay": 0.01,
"MaxAttempts": 3})
+
+
+class TestCustomDynamoDBServiceWaiters:
+ """Test waiters from ``amazon/aws/waiters/dynamodb.json``."""
+
+ STATUS_COMPLETED = "COMPLETED"
+ STATUS_FAILED = "FAILED"
+ STATUS_IN_PROGRESS = "IN_PROGRESS"
+
+ @pytest.fixture(autouse=True)
+ def setup_test_cases(self, monkeypatch):
+ self.client = boto3.client("dynamodb", region_name="eu-west-3")
+ monkeypatch.setattr(DynamoDBHook, "conn", self.client)
+
+ @pytest.fixture
+ def mock_describe_export(self):
+ """Mock ``DynamoDBHook.Client.describe_export`` method."""
+ with mock.patch.object(self.client, "describe_export") as m:
+ yield m
+
+ def test_service_waiters(self):
+ hook_waiters = DynamoDBHook(aws_conn_id=None).list_waiters()
+ assert "export_table" in hook_waiters
+
+ @staticmethod
+ def describe_export(status: str):
+ """
+ Helper function for generate minimal DescribeExport response for
single job.
+
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DescribeExport.html
+ """
+ return {"ExportDescription": {"ExportStatus": status}}
+
+ def test_export_table_to_point_in_time_completed(self,
mock_describe_export):
+ """Test state transition from `in progress` to `completed` during
init."""
+ with mock.patch("boto3.client") as client:
+ client.return_value = self.client
+ waiter = DynamoDBHook(aws_conn_id=None).get_waiter("export_table",
client=self.client)
+ mock_describe_export.side_effect = [
+ self.describe_export(self.STATUS_IN_PROGRESS),
+ self.describe_export(self.STATUS_COMPLETED),
+ ]
+ waiter.wait(
+
ExportArn="LoremIpsumissimplydummytextoftheprintingandtypesettingindustry",
+ )
+
+ def test_export_table_to_point_in_time_failed(self, mock_describe_export):
+ """Test state transition from `in progress` to `failed` during init."""
+ with mock.patch("boto3.client") as client:
+ client.return_value = self.client
+ mock_describe_export.side_effect = [
+ self.describe_export(self.STATUS_IN_PROGRESS),
+ self.describe_export(self.STATUS_FAILED),
+ ]
+ waiter = DynamoDBHook(aws_conn_id=None).get_waiter("export_table",
client=self.client)
+ with pytest.raises(WaiterError, match='we matched expected path:
"FAILED"'):
+
waiter.wait(ExportArn="LoremIpsumissimplydummytextoftheprintingandtypesettingindustry")
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index b56efaf2ce..bb2908b37d 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -127,6 +127,17 @@ with DAG(
},
)
# [END howto_transfer_dynamodb_to_s3_segmented]
+
+ # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+ backup_db_to_point_in_time = DynamoDBToS3Operator(
+ task_id="backup_db_to_point_in_time",
+ dynamodb_table_name=table_name,
+ file_size=1000,
+ s3_bucket_name=bucket_name,
+ export_time=datetime(year=2023, month=4, day=10),
+ )
+ # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+
delete_table = delete_dynamodb_table(table_name=table_name)
delete_bucket = S3DeleteBucketOperator(
@@ -146,6 +157,7 @@ with DAG(
backup_db,
backup_db_segment_1,
backup_db_segment_2,
+ backup_db_to_point_in_time,
# TEST TEARDOWN
delete_table,
delete_bucket,