This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0b3b6704cb Fix AWS system test example_dynamodb_to_s3 (#31362)
0b3b6704cb is described below
commit 0b3b6704cb12a3b8f22da79d80b3db85528418b7
Author: Vincent <[email protected]>
AuthorDate: Wed May 17 13:55:50 2023 -0400
Fix AWS system test example_dynamodb_to_s3 (#31362)
---
.../providers/amazon/aws/transfers/dynamodb_to_s3.py | 15 ++++++++++-----
.../amazon/aws/transfers/test_dynamodb_to_s3.py | 12 ++++++------
.../providers/amazon/aws/example_dynamodb_to_s3.py | 19 ++++++++++++++++++-
3 files changed, 34 insertions(+), 12 deletions(-)
diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index c8eee0b9f5..48067a666f 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -97,9 +97,14 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
template_fields: Sequence[str] = (
*AwsToAwsBaseOperator.template_fields,
+ "dynamodb_table_name",
"s3_bucket_name",
+ "file_size",
+ "dynamodb_scan_kwargs",
"s3_key_prefix",
- "dynamodb_table_name",
+ "process_func",
+ "export_time",
+ "export_format",
)
template_fields_renderers = {
@@ -129,9 +134,6 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
self.export_time = export_time
self.export_format = export_format
- if self.export_time and self.export_time > datetime.now():
- raise ValueError("The export_time parameter cannot be a future
time.")
-
@cached_property
def hook(self):
"""Create DynamoDBHook"""
@@ -148,6 +150,9 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
Export data from start of epoc till `export_time`. Table export will
be a snapshot of the table's
state at this point in time.
"""
+ if self.export_time and self.export_time >
datetime.now(self.export_time.tzinfo):
+ raise ValueError("The export_time parameter cannot be a future
time.")
+
client = self.hook.conn.meta.client
table_description =
client.describe_table(TableName=self.dynamodb_table_name)
response = client.export_table_to_point_in_time(
@@ -163,7 +168,7 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
def _export_entire_data(self):
"""Export all data from the table."""
- table = self.hook.get_conn().Table(self.dynamodb_table_name)
+ table = self.hook.conn.Table(self.dynamodb_table_name)
scan_kwargs = copy(self.dynamodb_scan_kwargs) if
self.dynamodb_scan_kwargs else {}
err = None
f: IO[Any]
diff --git a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
index 5bb02448f2..bc1b9751b7 100644
--- a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -70,7 +70,7 @@ class TestDynamodbToS3:
]
table = MagicMock()
table.return_value.scan.side_effect = responses
- mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+ mock_aws_dynamodb_hook.return_value.conn.Table = table
s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
@@ -99,7 +99,7 @@ class TestDynamodbToS3:
]
table = MagicMock()
table.return_value.scan.side_effect = responses
- mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+ mock_aws_dynamodb_hook.return_value.conn.Table = table
s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
@@ -198,7 +198,7 @@ class TestDynamodbToS3:
]
table = MagicMock()
table.return_value.scan.side_effect = responses
- mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+ mock_aws_dynamodb_hook.return_value.conn.Table = table
s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
@@ -234,7 +234,7 @@ class TestDynamodbToS3:
]
table = MagicMock()
table.return_value.scan.side_effect = responses
- mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+ mock_aws_dynamodb_hook.return_value.conn.Table = table
s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
@@ -272,7 +272,7 @@ class TestDynamodbToS3:
]
table = MagicMock()
table.return_value.scan.side_effect = responses
- mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+ mock_aws_dynamodb_hook.return_value.conn.Table = table
s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
@@ -356,4 +356,4 @@ class TestDynamodbToS3:
s3_bucket_name="airflow-bucket",
file_size=4000,
export_time=datetime(year=3000, month=1, day=1),
- )
+ ).execute(context={})
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index efe16dd86a..e482bc023e 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -56,9 +56,24 @@ def set_up_table(table_name: str):
boto3.client("dynamodb").get_waiter("table_exists").wait(
TableName=table_name, WaiterConfig={"Delay": 10, "MaxAttempts": 10}
)
+ boto3.client("dynamodb").update_continuous_backups(
+ TableName=table_name,
+ PointInTimeRecoverySpecification={
+ "PointInTimeRecoveryEnabled": True,
+ },
+ )
table.put_item(Item={"ID": "123", "Value": "Testing"})
+@task
+def get_export_time(table_name: str):
+ r = boto3.client("dynamodb").describe_continuous_backups(
+ TableName=table_name,
+ )
+
+ return
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["EarliestRestorableDateTime"]
+
+
@task
def wait_for_bucket(s3_bucket_name):
waiter = boto3.client("s3").get_waiter("bucket_exists")
@@ -128,13 +143,14 @@ with DAG(
)
# [END howto_transfer_dynamodb_to_s3_segmented]
+ export_time = get_export_time(table_name)
# [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
backup_db_to_point_in_time = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time",
dynamodb_table_name=table_name,
file_size=1000,
s3_bucket_name=bucket_name,
- export_time=datetime.utcnow() - datetime.timedelta(days=7),
+ export_time=export_time,
s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
)
# [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
@@ -158,6 +174,7 @@ with DAG(
backup_db,
backup_db_segment_1,
backup_db_segment_2,
+ export_time,
backup_db_to_point_in_time,
# TEST TEARDOWN
delete_table,