This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a70ee7209c Add incremental export and cross account export
functionality in `DynamoDBToS3Operator` (#41304)
a70ee7209c is described below
commit a70ee7209cd8d4dabb8bc1b1057d79ff25a99bae
Author: Steven Shidi Zhou <[email protected]>
AuthorDate: Thu Aug 15 18:56:32 2024 +0200
Add incremental export and cross account export functionality in
`DynamoDBToS3Operator` (#41304)
---------
Co-authored-by: Vincent <[email protected]>
---
.../amazon/aws/transfers/dynamodb_to_s3.py | 51 +++++++++++++++++-----
.../transfer/dynamodb_to_s3.rst | 16 +++++--
.../amazon/aws/transfers/test_dynamodb_to_s3.py | 2 +
.../providers/amazon/aws/example_dynamodb_to_s3.py | 45 ++++++++++++++++---
4 files changed, 94 insertions(+), 20 deletions(-)
diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index 81f0dd79b6..c40001bbff 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -32,6 +32,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import
AwsBaseHook
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.transfers.base import AwsToAwsBaseOperator
+from airflow.utils.helpers import prune_dict
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -89,10 +90,13 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
:param s3_key_prefix: Prefix of s3 object key
:param process_func: How we transform a dynamodb item to bytes. By
default, we dump the json
+ :param point_in_time_export: Boolean value indicating the operator to use
'scan' or 'point in time export'
:param export_time: Time in the past from which to export table data,
counted in seconds from the start of
the Unix epoch. The table export will be a snapshot of the table's state
at this point in time.
:param export_format: The format for the exported data. Valid values for
ExportFormat are DYNAMODB_JSON
or ION.
+ :param export_table_to_point_in_time_kwargs: extra parameters for the boto3
+ `export_table_to_point_in_time` function all. e.g. `ExportType`,
`IncrementalExportSpecification`
:param check_interval: The amount of time in seconds to wait between
attempts. Only if ``export_time`` is
provided.
:param max_attempts: The maximum number of attempts to be made. Only if
``export_time`` is provided.
@@ -107,12 +111,14 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
"s3_key_prefix",
"export_time",
"export_format",
+ "export_table_to_point_in_time_kwargs",
"check_interval",
"max_attempts",
)
template_fields_renderers = {
"dynamodb_scan_kwargs": "json",
+ "export_table_to_point_in_time_kwargs": "json",
}
def __init__(
@@ -120,12 +126,14 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
*,
dynamodb_table_name: str,
s3_bucket_name: str,
- file_size: int,
+ file_size: int = 1000,
dynamodb_scan_kwargs: dict[str, Any] | None = None,
s3_key_prefix: str = "",
process_func: Callable[[dict[str, Any]], bytes] =
_convert_item_to_json_bytes,
+ point_in_time_export: bool = False,
export_time: datetime | None = None,
export_format: str = "DYNAMODB_JSON",
+ export_table_to_point_in_time_kwargs: dict | None = None,
check_interval: int = 30,
max_attempts: int = 60,
**kwargs,
@@ -137,8 +145,10 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
self.s3_bucket_name = s3_bucket_name
self.s3_key_prefix = s3_key_prefix
+ self.point_in_time_export = point_in_time_export
self.export_time = export_time
self.export_format = export_format
+ self.export_table_to_point_in_time_kwargs =
export_table_to_point_in_time_kwargs
self.check_interval = check_interval
self.max_attempts = max_attempts
@@ -148,29 +158,50 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
def execute(self, context: Context) -> None:
- if self.export_time:
+ # There are 2 separate export to point in time configuration:
+ # 1. Full export, which takes the export_time arg.
+ # 2. Incremental export, which takes the incremental_export_... args
+ # Hence export time could not be used as the proper indicator for the
`_export_table_to_point_in_time`
+ # function. This change introduces a new boolean, as the indicator for
whether the operator scans
+ # and export entire data or using the point in time functionality.
+ if self.point_in_time_export or self.export_time:
self._export_table_to_point_in_time()
else:
self._export_entire_data()
def _export_table_to_point_in_time(self):
"""
- Export data from start of epoc till `export_time`.
+ Export data to point in time.
+ Full export exports data from start of epoc till `export_time`.
Table export will be a snapshot of the table's state at this point in
time.
+
+ Incremental export exports the data from a specific datetime to a
specific datetime
+
+
+ Note: S3BucketOwner is a required parameter when exporting to a S3
bucket in another account.
"""
if self.export_time and self.export_time >
datetime.now(self.export_time.tzinfo):
raise ValueError("The export_time parameter cannot be a future
time.")
client = self.hook.conn.meta.client
table_description =
client.describe_table(TableName=self.dynamodb_table_name)
- response = client.export_table_to_point_in_time(
- TableArn=table_description.get("Table", {}).get("TableArn"),
- ExportTime=self.export_time,
- S3Bucket=self.s3_bucket_name,
- S3Prefix=self.s3_key_prefix,
- ExportFormat=self.export_format,
- )
+
+ export_table_to_point_in_time_base_args = {
+ "TableArn": table_description.get("Table", {}).get("TableArn"),
+ "ExportTime": self.export_time,
+ "S3Bucket": self.s3_bucket_name,
+ "S3Prefix": self.s3_key_prefix,
+ "ExportFormat": self.export_format,
+ }
+ export_table_to_point_in_time_args = {
+ **export_table_to_point_in_time_base_args,
+ **self.export_table_to_point_in_time_kwargs,
+ }
+
+ args_filtered = prune_dict(export_table_to_point_in_time_args)
+
+ response = client.export_table_to_point_in_time(**args_filtered)
waiter = self.hook.get_waiter("export_table")
export_arn = response.get("ExportDescription", {}).get("ExportArn")
waiter.wait(
diff --git a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
index 764e63f96d..74c4b78cef 100644
--- a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
+++ b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
@@ -63,13 +63,23 @@ To parallelize the replication, users can create multiple
``DynamoDBToS3Operator
:start-after: [START howto_transfer_dynamodb_to_s3_segmented]
:end-before: [END howto_transfer_dynamodb_to_s3_segmented]
-Users can also pass in ``export_time`` param to ``DynamoDBToS3Operator`` to
recover data from a point in time.
+Users can also pass in ``point_in_time_export`` boolean param to
``DynamoDBToS3Operator`` to recover data from a point in time.
+
+Full export example usage:
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+ :language: python
+ :dedent: 4
+ :start-after: [START
howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+ :end-before: [END
howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+
+Incremental export example usage:
.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
:language: python
:dedent: 4
- :start-after: [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
- :end-before: [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+ :start-after: [START
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
+ :end-before: [END
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
Reference
---------
diff --git a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
index ad9400c72a..d608464b90 100644
--- a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -348,6 +348,7 @@ class TestDynamodbToS3:
dynamodb_table_name="airflow_rocks",
s3_bucket_name="airflow-bucket",
file_size=4000,
+ point_in_time_export=True,
export_time=datetime(year=1983, month=1, day=1),
)
dynamodb_to_s3_operator.execute(context={})
@@ -362,5 +363,6 @@ class TestDynamodbToS3:
dynamodb_table_name="airflow_rocks",
s3_bucket_name="airflow-bucket",
file_size=4000,
+ point_in_time_export=True,
export_time=datetime(year=3000, month=1, day=1),
).execute(context={})
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index dc08e2d5b9..c39abd3b80 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -92,6 +92,15 @@ def get_export_time(table_name: str):
return
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["EarliestRestorableDateTime"]
+@task
+def get_latest_export_time(table_name: str):
+ r = boto3.client("dynamodb").describe_continuous_backups(
+ TableName=table_name,
+ )
+
+ return
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["LatestRestorableDateTime"]
+
+
@task
def wait_for_bucket(s3_bucket_name):
waiter = boto3.client("s3").get_waiter("bucket_exists")
@@ -162,18 +171,39 @@ with DAG(
# [END howto_transfer_dynamodb_to_s3_segmented]
export_time = get_export_time(table_name)
- # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
- backup_db_to_point_in_time = DynamoDBToS3Operator(
- task_id="backup_db_to_point_in_time",
+ latest_export_time = get_latest_export_time(table_name)
+ # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+ backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
+ task_id="backup_db_to_point_in_time_full_export",
dynamodb_table_name=table_name,
- file_size=1000,
s3_bucket_name=bucket_name,
+ point_in_time_export=True,
export_time=export_time,
s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
)
- # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+ # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+
+ # [START
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
+ backup_db_to_point_in_time_incremental_export = DynamoDBToS3Operator(
+ task_id="backup_db_to_point_in_time_incremental_export",
+ dynamodb_table_name=table_name,
+ s3_bucket_name=bucket_name,
+ point_in_time_export=True,
+ s3_key_prefix=f"{S3_KEY_PREFIX}-4-",
+ export_table_to_point_in_time_kwargs={
+ "ExportType": "INCREMENTAL_EXPORT",
+ "IncrementalExportSpecification": {
+ "ExportFromTime": export_time,
+ "ExportToTime": latest_export_time,
+ "ExportViewType": "NEW_AND_OLD_IMAGES",
+ },
+ },
+ )
+ # [END
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
+
# This operation can take a long time to complete
- backup_db_to_point_in_time.max_attempts = 90
+ backup_db_to_point_in_time_full_export.max_attempts = 90
+ backup_db_to_point_in_time_incremental_export.max_attempts = 90
delete_table = delete_dynamodb_table(table_name=table_name)
@@ -195,7 +225,8 @@ with DAG(
backup_db_segment_1,
backup_db_segment_2,
export_time,
- backup_db_to_point_in_time,
+ backup_db_to_point_in_time_full_export,
+ backup_db_to_point_in_time_incremental_export,
# TEST TEARDOWN
delete_table,
delete_bucket,