This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b3b6704cb Fix AWS system test example_dynamodb_to_s3 (#31362)
0b3b6704cb is described below

commit 0b3b6704cb12a3b8f22da79d80b3db85528418b7
Author: Vincent <[email protected]>
AuthorDate: Wed May 17 13:55:50 2023 -0400

    Fix AWS system test example_dynamodb_to_s3 (#31362)
---
 .../providers/amazon/aws/transfers/dynamodb_to_s3.py  | 15 ++++++++++-----
 .../amazon/aws/transfers/test_dynamodb_to_s3.py       | 12 ++++++------
 .../providers/amazon/aws/example_dynamodb_to_s3.py    | 19 ++++++++++++++++++-
 3 files changed, 34 insertions(+), 12 deletions(-)

diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py 
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index c8eee0b9f5..48067a666f 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -97,9 +97,14 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
 
     template_fields: Sequence[str] = (
         *AwsToAwsBaseOperator.template_fields,
+        "dynamodb_table_name",
         "s3_bucket_name",
+        "file_size",
+        "dynamodb_scan_kwargs",
         "s3_key_prefix",
-        "dynamodb_table_name",
+        "process_func",
+        "export_time",
+        "export_format",
     )
 
     template_fields_renderers = {
@@ -129,9 +134,6 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         self.export_time = export_time
         self.export_format = export_format
 
-        if self.export_time and self.export_time > datetime.now():
-            raise ValueError("The export_time parameter cannot be a future 
time.")
-
     @cached_property
     def hook(self):
         """Create DynamoDBHook"""
@@ -148,6 +150,9 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         Export data from start of epoc till `export_time`. Table export will 
be a snapshot of the table's
          state at this point in time.
         """
+        if self.export_time and self.export_time > 
datetime.now(self.export_time.tzinfo):
+            raise ValueError("The export_time parameter cannot be a future 
time.")
+
         client = self.hook.conn.meta.client
         table_description = 
client.describe_table(TableName=self.dynamodb_table_name)
         response = client.export_table_to_point_in_time(
@@ -163,7 +168,7 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
 
     def _export_entire_data(self):
         """Export all data from the table."""
-        table = self.hook.get_conn().Table(self.dynamodb_table_name)
+        table = self.hook.conn.Table(self.dynamodb_table_name)
         scan_kwargs = copy(self.dynamodb_scan_kwargs) if 
self.dynamodb_scan_kwargs else {}
         err = None
         f: IO[Any]
diff --git a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py 
b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
index 5bb02448f2..bc1b9751b7 100644
--- a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -70,7 +70,7 @@ class TestDynamodbToS3:
         ]
         table = MagicMock()
         table.return_value.scan.side_effect = responses
-        mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+        mock_aws_dynamodb_hook.return_value.conn.Table = table
 
         s3_client = MagicMock()
         s3_client.return_value.upload_file = self.mock_upload_file
@@ -99,7 +99,7 @@ class TestDynamodbToS3:
         ]
         table = MagicMock()
         table.return_value.scan.side_effect = responses
-        mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+        mock_aws_dynamodb_hook.return_value.conn.Table = table
 
         s3_client = MagicMock()
         s3_client.return_value.upload_file = self.mock_upload_file
@@ -198,7 +198,7 @@ class TestDynamodbToS3:
         ]
         table = MagicMock()
         table.return_value.scan.side_effect = responses
-        mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+        mock_aws_dynamodb_hook.return_value.conn.Table = table
 
         s3_client = MagicMock()
         s3_client.return_value.upload_file = self.mock_upload_file
@@ -234,7 +234,7 @@ class TestDynamodbToS3:
         ]
         table = MagicMock()
         table.return_value.scan.side_effect = responses
-        mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+        mock_aws_dynamodb_hook.return_value.conn.Table = table
 
         s3_client = MagicMock()
         s3_client.return_value.upload_file = self.mock_upload_file
@@ -272,7 +272,7 @@ class TestDynamodbToS3:
         ]
         table = MagicMock()
         table.return_value.scan.side_effect = responses
-        mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+        mock_aws_dynamodb_hook.return_value.conn.Table = table
 
         s3_client = MagicMock()
         s3_client.return_value.upload_file = self.mock_upload_file
@@ -356,4 +356,4 @@ class TestDynamodbToS3:
                 s3_bucket_name="airflow-bucket",
                 file_size=4000,
                 export_time=datetime(year=3000, month=1, day=1),
-            )
+            ).execute(context={})
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py 
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index efe16dd86a..e482bc023e 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -56,9 +56,24 @@ def set_up_table(table_name: str):
     boto3.client("dynamodb").get_waiter("table_exists").wait(
         TableName=table_name, WaiterConfig={"Delay": 10, "MaxAttempts": 10}
     )
+    boto3.client("dynamodb").update_continuous_backups(
+        TableName=table_name,
+        PointInTimeRecoverySpecification={
+            "PointInTimeRecoveryEnabled": True,
+        },
+    )
     table.put_item(Item={"ID": "123", "Value": "Testing"})
 
 
+@task
+def get_export_time(table_name: str):
+    r = boto3.client("dynamodb").describe_continuous_backups(
+        TableName=table_name,
+    )
+
+    return 
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["EarliestRestorableDateTime"]
+
+
 @task
 def wait_for_bucket(s3_bucket_name):
     waiter = boto3.client("s3").get_waiter("bucket_exists")
@@ -128,13 +143,14 @@ with DAG(
     )
     # [END howto_transfer_dynamodb_to_s3_segmented]
 
+    export_time = get_export_time(table_name)
     # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
     backup_db_to_point_in_time = DynamoDBToS3Operator(
         task_id="backup_db_to_point_in_time",
         dynamodb_table_name=table_name,
         file_size=1000,
         s3_bucket_name=bucket_name,
-        export_time=datetime.utcnow() - datetime.timedelta(days=7),
+        export_time=export_time,
         s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
     )
     # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
@@ -158,6 +174,7 @@ with DAG(
         backup_db,
         backup_db_segment_1,
         backup_db_segment_2,
+        export_time,
         backup_db_to_point_in_time,
         # TEST TEARDOWN
         delete_table,

Reply via email to