This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1b04430464 Neptune system test tweaks (#37921)
1b04430464 is described below
commit 1b044304648157cc01d029416732b52645164dca
Author: D. Ferruzzi <[email protected]>
AuthorDate: Wed Mar 6 03:17:17 2024 -0800
Neptune system test tweaks (#37921)
* Neptune system test tweaks
Some small adjustments were needed to make this run for our dashboard.
* whitespace adjustment
---
.../system/providers/amazon/aws/example_neptune.py | 44 ++++++++++++++++------
1 file changed, 32 insertions(+), 12 deletions(-)
diff --git a/tests/system/providers/amazon/aws/example_neptune.py
b/tests/system/providers/amazon/aws/example_neptune.py
index fc9d4226b5..81276c2dcd 100644
--- a/tests/system/providers/amazon/aws/example_neptune.py
+++ b/tests/system/providers/amazon/aws/example_neptune.py
@@ -16,10 +16,12 @@
# under the License.
from __future__ import annotations
-import pendulum
+from datetime import datetime
+from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
+from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook
from airflow.providers.amazon.aws.operators.neptune import (
NeptuneStartDbClusterOperator,
NeptuneStopDbClusterOperator,
@@ -27,34 +29,52 @@ from airflow.providers.amazon.aws.operators.neptune import (
from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder
DAG_ID = "example_neptune"
-# This test requires an existing Neptune cluster.
-CLUSTER_ID = "CLUSTER_ID"
-sys_test_context_task =
SystemTestContextBuilder().add_variable(CLUSTER_ID).build()
+sys_test_context_task = SystemTestContextBuilder().build()
-with DAG(DAG_ID, schedule="@once", start_date=pendulum.datetime(2024, 1, 1,
tz="UTC"), catchup=False) as dag:
+
+@task
+def create_cluster(cluster_id):
+ hook = NeptuneHook()
+ hook.conn.create_db_cluster(DBClusterIdentifier=cluster_id,
Engine="neptune", DeletionProtection=False)
+ hook.wait_for_cluster_availability(cluster_id=cluster_id)
+
+
+@task
+def delete_cluster(cluster_id):
+ hook = NeptuneHook()
+ hook.conn.delete_db_cluster(DBClusterIdentifier=cluster_id,
SkipFinalSnapshot=True)
+
+
+with DAG(
+ dag_id=DAG_ID,
+ start_date=datetime(2021, 1, 1),
+ schedule="@once",
+ catchup=False,
+ tags=["example"],
+) as dag:
test_context = sys_test_context_task()
+
env_id = test_context["ENV_ID"]
- cluster_id = test_context["CLUSTER_ID"]
+ cluster_id = f"{env_id}-cluster"
# [START howto_operator_start_neptune_cluster]
- start_cluster = NeptuneStartDbClusterOperator(
- task_id="start_task", db_cluster_id=cluster_id, deferrable=True
- )
+ start_cluster = NeptuneStartDbClusterOperator(task_id="start_task",
db_cluster_id=cluster_id)
# [END howto_operator_start_neptune_cluster]
# [START howto_operator_stop_neptune_cluster]
- stop_cluster = NeptuneStopDbClusterOperator(
- task_id="stop_task", db_cluster_id=cluster_id, deferrable=True
- )
+ stop_cluster = NeptuneStopDbClusterOperator(task_id="stop_task",
db_cluster_id=cluster_id)
# [END howto_operator_stop_neptune_cluster]
chain(
# TEST SETUP
test_context,
+ create_cluster(cluster_id),
# TEST BODY
start_cluster,
stop_cluster,
+ # TEST TEARDOWN
+ delete_cluster(cluster_id),
)
from tests.system.utils.watcher import watcher