This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a70ee7209c Add incremental export and cross account export 
functionality in `DynamoDBToS3Operator` (#41304)
a70ee7209c is described below

commit a70ee7209cd8d4dabb8bc1b1057d79ff25a99bae
Author: Steven Shidi Zhou <[email protected]>
AuthorDate: Thu Aug 15 18:56:32 2024 +0200

    Add incremental export and cross account export functionality in 
`DynamoDBToS3Operator` (#41304)
    
    
    
    ---------
    
    Co-authored-by: Vincent <[email protected]>
---
 .../amazon/aws/transfers/dynamodb_to_s3.py         | 51 +++++++++++++++++-----
 .../transfer/dynamodb_to_s3.rst                    | 16 +++++--
 .../amazon/aws/transfers/test_dynamodb_to_s3.py    |  2 +
 .../providers/amazon/aws/example_dynamodb_to_s3.py | 45 ++++++++++++++++---
 4 files changed, 94 insertions(+), 20 deletions(-)

diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py 
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index 81f0dd79b6..c40001bbff 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -32,6 +32,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import 
AwsBaseHook
 from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.transfers.base import AwsToAwsBaseOperator
+from airflow.utils.helpers import prune_dict
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -89,10 +90,13 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
     :param s3_key_prefix: Prefix of s3 object key
     :param process_func: How we transform a dynamodb item to bytes. By 
default, we dump the json
+    :param point_in_time_export: Boolean value indicating the operator to use 
'scan' or 'point in time export'
     :param export_time: Time in the past from which to export table data, 
counted in seconds from the start of
      the Unix epoch. The table export will be a snapshot of the table's state 
at this point in time.
     :param export_format: The format for the exported data. Valid values for 
ExportFormat are DYNAMODB_JSON
      or ION.
+    :param export_table_to_point_in_time_kwargs: extra parameters for the boto3
+        `export_table_to_point_in_time` function all. e.g. `ExportType`, 
`IncrementalExportSpecification`
     :param check_interval: The amount of time in seconds to wait between 
attempts. Only if ``export_time`` is
         provided.
     :param max_attempts: The maximum number of attempts to be made. Only if 
``export_time`` is provided.
@@ -107,12 +111,14 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         "s3_key_prefix",
         "export_time",
         "export_format",
+        "export_table_to_point_in_time_kwargs",
         "check_interval",
         "max_attempts",
     )
 
     template_fields_renderers = {
         "dynamodb_scan_kwargs": "json",
+        "export_table_to_point_in_time_kwargs": "json",
     }
 
     def __init__(
@@ -120,12 +126,14 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         *,
         dynamodb_table_name: str,
         s3_bucket_name: str,
-        file_size: int,
+        file_size: int = 1000,
         dynamodb_scan_kwargs: dict[str, Any] | None = None,
         s3_key_prefix: str = "",
         process_func: Callable[[dict[str, Any]], bytes] = 
_convert_item_to_json_bytes,
+        point_in_time_export: bool = False,
         export_time: datetime | None = None,
         export_format: str = "DYNAMODB_JSON",
+        export_table_to_point_in_time_kwargs: dict | None = None,
         check_interval: int = 30,
         max_attempts: int = 60,
         **kwargs,
@@ -137,8 +145,10 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
+        self.point_in_time_export = point_in_time_export
         self.export_time = export_time
         self.export_format = export_format
+        self.export_table_to_point_in_time_kwargs = 
export_table_to_point_in_time_kwargs
         self.check_interval = check_interval
         self.max_attempts = max_attempts
 
@@ -148,29 +158,50 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
 
     def execute(self, context: Context) -> None:
-        if self.export_time:
+        # There are 2 separate export to point in time configuration:
+        # 1. Full export, which takes the export_time arg.
+        # 2. Incremental export, which takes the incremental_export_... args
+        # Hence export time could not be used as the proper indicator for the 
`_export_table_to_point_in_time`
+        # function. This change introduces a new boolean, as the indicator for 
whether the operator scans
+        # and export entire data or using the point in time functionality.
+        if self.point_in_time_export or self.export_time:
             self._export_table_to_point_in_time()
         else:
             self._export_entire_data()
 
     def _export_table_to_point_in_time(self):
         """
-        Export data from start of epoc till `export_time`.
+        Export data to point in time.
 
+        Full export exports data from start of epoc till `export_time`.
         Table export will be a snapshot of the table's state at this point in 
time.
+
+        Incremental export exports the data from a specific datetime to a 
specific datetime
+
+
+        Note: S3BucketOwner is a required parameter when exporting to a S3 
bucket in another account.
         """
         if self.export_time and self.export_time > 
datetime.now(self.export_time.tzinfo):
             raise ValueError("The export_time parameter cannot be a future 
time.")
 
         client = self.hook.conn.meta.client
         table_description = 
client.describe_table(TableName=self.dynamodb_table_name)
-        response = client.export_table_to_point_in_time(
-            TableArn=table_description.get("Table", {}).get("TableArn"),
-            ExportTime=self.export_time,
-            S3Bucket=self.s3_bucket_name,
-            S3Prefix=self.s3_key_prefix,
-            ExportFormat=self.export_format,
-        )
+
+        export_table_to_point_in_time_base_args = {
+            "TableArn": table_description.get("Table", {}).get("TableArn"),
+            "ExportTime": self.export_time,
+            "S3Bucket": self.s3_bucket_name,
+            "S3Prefix": self.s3_key_prefix,
+            "ExportFormat": self.export_format,
+        }
+        export_table_to_point_in_time_args = {
+            **export_table_to_point_in_time_base_args,
+            **self.export_table_to_point_in_time_kwargs,
+        }
+
+        args_filtered = prune_dict(export_table_to_point_in_time_args)
+
+        response = client.export_table_to_point_in_time(**args_filtered)
         waiter = self.hook.get_waiter("export_table")
         export_arn = response.get("ExportDescription", {}).get("ExportArn")
         waiter.wait(
diff --git a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst 
b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
index 764e63f96d..74c4b78cef 100644
--- a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
+++ b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
@@ -63,13 +63,23 @@ To parallelize the replication, users can create multiple 
``DynamoDBToS3Operator
     :start-after: [START howto_transfer_dynamodb_to_s3_segmented]
     :end-before: [END howto_transfer_dynamodb_to_s3_segmented]
 
-Users can also pass in ``export_time`` param to ``DynamoDBToS3Operator`` to 
recover data from a point in time.
+Users can also pass in ``point_in_time_export`` boolean param to 
``DynamoDBToS3Operator`` to recover data from a point in time.
+
+Full export example usage:
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+    :language: python
+    :dedent: 4
+    :start-after: [START 
howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+    :end-before: [END 
howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+
+Incremental export example usage:
 
 .. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
-    :end-before: [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+    :start-after: [START 
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
+    :end-before: [END 
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
 
 Reference
 ---------
diff --git a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py 
b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
index ad9400c72a..d608464b90 100644
--- a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -348,6 +348,7 @@ class TestDynamodbToS3:
             dynamodb_table_name="airflow_rocks",
             s3_bucket_name="airflow-bucket",
             file_size=4000,
+            point_in_time_export=True,
             export_time=datetime(year=1983, month=1, day=1),
         )
         dynamodb_to_s3_operator.execute(context={})
@@ -362,5 +363,6 @@ class TestDynamodbToS3:
                 dynamodb_table_name="airflow_rocks",
                 s3_bucket_name="airflow-bucket",
                 file_size=4000,
+                point_in_time_export=True,
                 export_time=datetime(year=3000, month=1, day=1),
             ).execute(context={})
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py 
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index dc08e2d5b9..c39abd3b80 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -92,6 +92,15 @@ def get_export_time(table_name: str):
     return 
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["EarliestRestorableDateTime"]
 
 
+@task
+def get_latest_export_time(table_name: str):
+    r = boto3.client("dynamodb").describe_continuous_backups(
+        TableName=table_name,
+    )
+
+    return 
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["LatestRestorableDateTime"]
+
+
 @task
 def wait_for_bucket(s3_bucket_name):
     waiter = boto3.client("s3").get_waiter("bucket_exists")
@@ -162,18 +171,39 @@ with DAG(
     # [END howto_transfer_dynamodb_to_s3_segmented]
 
     export_time = get_export_time(table_name)
-    # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
-    backup_db_to_point_in_time = DynamoDBToS3Operator(
-        task_id="backup_db_to_point_in_time",
+    latest_export_time = get_latest_export_time(table_name)
+    # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+    backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
+        task_id="backup_db_to_point_in_time_full_export",
         dynamodb_table_name=table_name,
-        file_size=1000,
         s3_bucket_name=bucket_name,
+        point_in_time_export=True,
         export_time=export_time,
         s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
     )
-    # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+    # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+
+    # [START 
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
+    backup_db_to_point_in_time_incremental_export = DynamoDBToS3Operator(
+        task_id="backup_db_to_point_in_time_incremental_export",
+        dynamodb_table_name=table_name,
+        s3_bucket_name=bucket_name,
+        point_in_time_export=True,
+        s3_key_prefix=f"{S3_KEY_PREFIX}-4-",
+        export_table_to_point_in_time_kwargs={
+            "ExportType": "INCREMENTAL_EXPORT",
+            "IncrementalExportSpecification": {
+                "ExportFromTime": export_time,
+                "ExportToTime": latest_export_time,
+                "ExportViewType": "NEW_AND_OLD_IMAGES",
+            },
+        },
+    )
+    # [END 
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
+
     # This operation can take a long time to complete
-    backup_db_to_point_in_time.max_attempts = 90
+    backup_db_to_point_in_time_full_export.max_attempts = 90
+    backup_db_to_point_in_time_incremental_export.max_attempts = 90
 
     delete_table = delete_dynamodb_table(table_name=table_name)
 
@@ -195,7 +225,8 @@ with DAG(
         backup_db_segment_1,
         backup_db_segment_2,
         export_time,
-        backup_db_to_point_in_time,
+        backup_db_to_point_in_time_full_export,
+        backup_db_to_point_in_time_incremental_export,
         # TEST TEARDOWN
         delete_table,
         delete_bucket,

Reply via email to