This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8c2f56375a Skip task in example_dynamodb_to_s3.py (#41546)
8c2f56375a is described below
commit 8c2f56375a4560361c5f96d54c92c46674473387
Author: D. Ferruzzi <[email protected]>
AuthorDate: Fri Aug 16 13:29:42 2024 -0700
Skip task in example_dynamodb_to_s3.py (#41546)
`backup_db_to_point_in_time_incremental_export` requires at least a
15-minute window of available data, causing the system tests to fail. Moving
it into a branching task group will let us keep the code snippet in the docs
without actually running the task in the test.
---
.../providers/amazon/aws/example_dynamodb_to_s3.py | 93 ++++++++++++++--------
1 file changed, 58 insertions(+), 35 deletions(-)
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index 225aaa7803..e22bc2080d 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -17,17 +17,19 @@
from __future__ import annotations
import logging
-from datetime import datetime
+from datetime import datetime, timedelta
import boto3
import tenacity
from tenacity import before_log, before_sleep_log
-from airflow.decorators import task
+from airflow.decorators import task, task_group
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
+from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator,
S3DeleteBucketOperator
from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import
DynamoDBToS3Operator
+from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
@@ -92,15 +94,6 @@ def get_export_time(table_name: str):
return
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["EarliestRestorableDateTime"]
-@task
-def get_latest_export_time(table_name: str):
- r = boto3.client("dynamodb").describe_continuous_backups(
- TableName=table_name,
- )
-
- return
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["LatestRestorableDateTime"]
-
-
@task
def wait_for_bucket(s3_bucket_name):
waiter = boto3.client("s3").get_waiter("bucket_exists")
@@ -115,6 +108,59 @@ def delete_dynamodb_table(table_name: str):
)
+@task_group
+def incremental_export(table_name: str, start_time: datetime):
+ """
+ Incremental export requires a minimum window of 15 minutes of data to
export.
+ This task group allows us to have the sample code snippet for the docs
while
+ skipping the task when we run the actual test.
+ """
+
+ @task
+ def get_latest_export_time(table_name: str):
+ r = boto3.client("dynamodb").describe_continuous_backups(
+ TableName=table_name,
+ )
+
+ return
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["LatestRestorableDateTime"]
+
+ end_time = get_latest_export_time(table_name)
+
+ # [START
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
+ backup_db_to_point_in_time_incremental_export = DynamoDBToS3Operator(
+ task_id="backup_db_to_point_in_time_incremental_export",
+ dynamodb_table_name=table_name,
+ s3_bucket_name=bucket_name,
+ point_in_time_export=True,
+ s3_key_prefix=f"{S3_KEY_PREFIX}-4-",
+ export_table_to_point_in_time_kwargs={
+ "ExportType": "INCREMENTAL_EXPORT",
+ "IncrementalExportSpecification": {
+ "ExportFromTime": start_time,
+ "ExportToTime": end_time,
+ "ExportViewType": "NEW_AND_OLD_IMAGES",
+ },
+ },
+ )
+ # [END
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
+ # This operation can take a long time to complete
+ backup_db_to_point_in_time_incremental_export.max_attempts = 90
+
+ @task.branch
+ def skip_incremental_export(start_time: datetime, end_time: datetime):
+ not_enough_time = end_time < (start_time + timedelta(minutes=15))
+ return (
+ end_workflow.task_id if not_enough_time else
backup_db_to_point_in_time_incremental_export.task_id
+ )
+
+ skip_incremental = skip_incremental_export(start_time, end_time)
+
+ end_workflow = EmptyOperator(task_id="end_workflow",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
+
+ chain(end_time, skip_incremental, Label("Incremental backup skipped"),
end_workflow)
+ chain(end_time, skip_incremental,
backup_db_to_point_in_time_incremental_export, end_workflow)
+
+
with DAG(
dag_id=DAG_ID,
schedule="@once",
@@ -171,7 +217,6 @@ with DAG(
# [END howto_transfer_dynamodb_to_s3_segmented]
export_time = get_export_time(table_name)
- latest_export_time = get_latest_export_time(table_name)
# [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time_full_export",
@@ -182,28 +227,7 @@ with DAG(
s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
)
# [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
-
- # [START
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
- backup_db_to_point_in_time_incremental_export = DynamoDBToS3Operator(
- task_id="backup_db_to_point_in_time_incremental_export",
- dynamodb_table_name=table_name,
- s3_bucket_name=bucket_name,
- point_in_time_export=True,
- s3_key_prefix=f"{S3_KEY_PREFIX}-4-",
- export_table_to_point_in_time_kwargs={
- "ExportType": "INCREMENTAL_EXPORT",
- "IncrementalExportSpecification": {
- "ExportFromTime": export_time,
- "ExportToTime": latest_export_time,
- "ExportViewType": "NEW_AND_OLD_IMAGES",
- },
- },
- )
- # [END
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
-
- # This operation can take a long time to complete
backup_db_to_point_in_time_full_export.max_attempts = 90
- backup_db_to_point_in_time_incremental_export.max_attempts = 90
delete_table = delete_dynamodb_table(table_name=table_name)
@@ -225,9 +249,8 @@ with DAG(
backup_db_segment_1,
backup_db_segment_2,
export_time,
- latest_export_time,
backup_db_to_point_in_time_full_export,
- backup_db_to_point_in_time_incremental_export,
+ incremental_export(table_name=table_name, start_time=export_time),
# TEST TEARDOWN
delete_table,
delete_bucket,