This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b1196460db Add `check_interval` and `max_attempts` as parameter of
`DynamoDBToS3Operator` (#34972)
b1196460db is described below
commit b1196460db1a21b2c6c3ef2e841fc6d0c22afe97
Author: Vincent <[email protected]>
AuthorDate: Mon Oct 16 15:09:20 2023 -0400
Add `check_interval` and `max_attempts` as parameter of
`DynamoDBToS3Operator` (#34972)
---
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py | 14 +++++++++++++-
.../system/providers/amazon/aws/example_dynamodb_to_s3.py | 2 ++
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index b83ff48906..5351f3ff7c 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -92,6 +92,9 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
the Unix epoch. The table export will be a snapshot of the table's state
at this point in time.
:param export_format: The format for the exported data. Valid values for
ExportFormat are DYNAMODB_JSON
or ION.
+ :param check_interval: The amount of time in seconds to wait between
attempts. Only if ``export_time`` is
+ provided.
+ :param max_attempts: The maximum number of attempts to be made. Only if
``export_time`` is provided.
"""
template_fields: Sequence[str] = (
@@ -104,6 +107,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
"process_func",
"export_time",
"export_format",
+ "check_interval",
+ "max_attempts",
)
template_fields_renderers = {
@@ -121,6 +126,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
process_func: Callable[[dict[str, Any]], bytes] =
_convert_item_to_json_bytes,
export_time: datetime | None = None,
export_format: str = "DYNAMODB_JSON",
+ check_interval: int = 30,
+ max_attempts: int = 60,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -132,6 +139,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
self.s3_key_prefix = s3_key_prefix
self.export_time = export_time
self.export_format = export_format
+ self.check_interval = check_interval
+ self.max_attempts = max_attempts
@cached_property
def hook(self):
@@ -164,7 +173,10 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
)
waiter = self.hook.get_waiter("export_table")
export_arn = response.get("ExportDescription", {}).get("ExportArn")
- waiter.wait(ExportArn=export_arn)
+ waiter.wait(
+ ExportArn=export_arn,
+ WaiterConfig={"Delay": self.check_interval, "MaxAttempts":
self.max_attempts},
+ )
def _export_entire_data(self):
"""Export all data from the table."""
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index 70ea9ad77e..dc08e2d5b9 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -172,6 +172,8 @@ with DAG(
s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
)
# [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+ # This operation can take a long time to complete
+ backup_db_to_point_in_time.max_attempts = 90
delete_table = delete_dynamodb_table(table_name=table_name)