This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c2f56375a Skip task in example_dynamodb_to_s3.py (#41546)
8c2f56375a is described below

commit 8c2f56375a4560361c5f96d54c92c46674473387
Author: D. Ferruzzi <[email protected]>
AuthorDate: Fri Aug 16 13:29:42 2024 -0700

    Skip task in example_dynamodb_to_s3.py (#41546)
    
    `backup_db_to_point_in_time_incremental_export` requires at least a 
15-minute window of available data, causing the system tests to fail.  Moving 
it into a branching task group will let us keep the code snippet in the docs 
without actually running the task in the test.
---
 .../providers/amazon/aws/example_dynamodb_to_s3.py | 93 ++++++++++++++--------
 1 file changed, 58 insertions(+), 35 deletions(-)

diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py 
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index 225aaa7803..e22bc2080d 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -17,17 +17,19 @@
 from __future__ import annotations
 
 import logging
-from datetime import datetime
+from datetime import datetime, timedelta
 
 import boto3
 import tenacity
 from tenacity import before_log, before_sleep_log
 
-from airflow.decorators import task
+from airflow.decorators import task, task_group
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
+from airflow.operators.empty import EmptyOperator
 from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, 
S3DeleteBucketOperator
 from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import 
DynamoDBToS3Operator
+from airflow.utils.edgemodifier import Label
 from airflow.utils.trigger_rule import TriggerRule
 from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
 
@@ -92,15 +94,6 @@ def get_export_time(table_name: str):
     return 
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["EarliestRestorableDateTime"]
 
 
-@task
-def get_latest_export_time(table_name: str):
-    r = boto3.client("dynamodb").describe_continuous_backups(
-        TableName=table_name,
-    )
-
-    return 
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["LatestRestorableDateTime"]
-
-
 @task
 def wait_for_bucket(s3_bucket_name):
     waiter = boto3.client("s3").get_waiter("bucket_exists")
@@ -115,6 +108,59 @@ def delete_dynamodb_table(table_name: str):
     )
 
 
+@task_group
+def incremental_export(table_name: str, start_time: datetime):
+    """
+    Incremental export requires a minimum window of 15 minutes of data to 
export.
+    This task group allows us to have the sample code snippet for the docs 
while
+    skipping the task when we run the actual test.
+    """
+
+    @task
+    def get_latest_export_time(table_name: str):
+        r = boto3.client("dynamodb").describe_continuous_backups(
+            TableName=table_name,
+        )
+
+        return 
r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["LatestRestorableDateTime"]
+
+    end_time = get_latest_export_time(table_name)
+
+    # [START 
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
+    backup_db_to_point_in_time_incremental_export = DynamoDBToS3Operator(
+        task_id="backup_db_to_point_in_time_incremental_export",
+        dynamodb_table_name=table_name,
+        s3_bucket_name=bucket_name,
+        point_in_time_export=True,
+        s3_key_prefix=f"{S3_KEY_PREFIX}-4-",
+        export_table_to_point_in_time_kwargs={
+            "ExportType": "INCREMENTAL_EXPORT",
+            "IncrementalExportSpecification": {
+                "ExportFromTime": start_time,
+                "ExportToTime": end_time,
+                "ExportViewType": "NEW_AND_OLD_IMAGES",
+            },
+        },
+    )
+    # [END 
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
+    # This operation can take a long time to complete
+    backup_db_to_point_in_time_incremental_export.max_attempts = 90
+
+    @task.branch
+    def skip_incremental_export(start_time: datetime, end_time: datetime):
+        not_enough_time = end_time < (start_time + timedelta(minutes=15))
+        return (
+            end_workflow.task_id if not_enough_time else 
backup_db_to_point_in_time_incremental_export.task_id
+        )
+
+    skip_incremental = skip_incremental_export(start_time, end_time)
+
+    end_workflow = EmptyOperator(task_id="end_workflow", 
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
+
+    chain(end_time, skip_incremental, Label("Incremental backup skipped"), 
end_workflow)
+    chain(end_time, skip_incremental, 
backup_db_to_point_in_time_incremental_export, end_workflow)
+
+
 with DAG(
     dag_id=DAG_ID,
     schedule="@once",
@@ -171,7 +217,6 @@ with DAG(
     # [END howto_transfer_dynamodb_to_s3_segmented]
 
     export_time = get_export_time(table_name)
-    latest_export_time = get_latest_export_time(table_name)
     # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
     backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
         task_id="backup_db_to_point_in_time_full_export",
@@ -182,28 +227,7 @@ with DAG(
         s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
     )
     # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
-
-    # [START 
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
-    backup_db_to_point_in_time_incremental_export = DynamoDBToS3Operator(
-        task_id="backup_db_to_point_in_time_incremental_export",
-        dynamodb_table_name=table_name,
-        s3_bucket_name=bucket_name,
-        point_in_time_export=True,
-        s3_key_prefix=f"{S3_KEY_PREFIX}-4-",
-        export_table_to_point_in_time_kwargs={
-            "ExportType": "INCREMENTAL_EXPORT",
-            "IncrementalExportSpecification": {
-                "ExportFromTime": export_time,
-                "ExportToTime": latest_export_time,
-                "ExportViewType": "NEW_AND_OLD_IMAGES",
-            },
-        },
-    )
-    # [END 
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
-
-    # This operation can take a long time to complete
     backup_db_to_point_in_time_full_export.max_attempts = 90
-    backup_db_to_point_in_time_incremental_export.max_attempts = 90
 
     delete_table = delete_dynamodb_table(table_name=table_name)
 
@@ -225,9 +249,8 @@ with DAG(
         backup_db_segment_1,
         backup_db_segment_2,
         export_time,
-        latest_export_time,
         backup_db_to_point_in_time_full_export,
-        backup_db_to_point_in_time_incremental_export,
+        incremental_export(table_name=table_name, start_time=export_time),
         # TEST TEARDOWN
         delete_table,
         delete_bucket,

Reply via email to