o-nikolas commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996454757
##########
tests/system/providers/amazon/aws/example_rds_instance.py:
##########
@@ -71,6 +73,22 @@
)
# [END howto_sensor_rds_instance]
+ # [START howto_operator_rds_stop_db]
+ stop_db_instance = RdsStopDbOperator(
+ task_id="stop_db_instance",
+ db_identifier=rds_db_identifier,
+ wait_for_completion=True,
Review Comment:
`wait_for_completion=True` is a default value, you can drop it from this
call (similar to how RdsCreateDbInstanceOperator` does above.
##########
tests/providers/amazon/aws/operators/test_rds.py:
##########
@@ -767,3 +769,127 @@ def test_delete_event_subscription_no_wait(self,
mock_await_status):
with pytest.raises(self.hook.conn.exceptions.ClientError):
self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
assert mock_await_status.not_called()
+
+
[email protected](mock_rds is None, reason="mock_rds package not present")
+class TestRdsStopDbOperator:
+ @classmethod
+ def setup_class(cls):
+ cls.dag = DAG("test_dag", default_args={"owner": "airflow",
"start_date": DEFAULT_DATE})
+ cls.hook = RdsHook(aws_conn_id=AWS_CONN, region_name="us-east-1")
+ _patch_hook_get_connection(cls.hook)
+
+ @classmethod
+ def teardown_class(cls):
+ del cls.dag
+ del cls.hook
+
+ @mock_rds
+ @patch.object(RdsBaseOperator, "_await_status")
+ def test_stop_db_instance(self, mock_await_status):
+ _create_db_instance(self.hook)
+ stop_db_instance = RdsStopDbOperator(task_id="test_stop_db_instance",
db_identifier=DB_INSTANCE_NAME)
+ _patch_hook_get_connection(stop_db_instance.hook)
+ stop_db_instance.execute(None)
+ result =
self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
+ status = result["DBInstances"][0]["DBInstanceStatus"]
+ assert status == "stopped"
+ mock_await_status.assert_called()
+
+ @mock_rds
+ @patch.object(RdsBaseOperator, "_await_status")
+ def test_stop_db_instance_no_wait(self, mock_await_status):
+ _create_db_instance(self.hook)
+ stop_db_instance = RdsStopDbOperator(
+ task_id="test_stop_db_instance_no_wait",
db_identifier=DB_INSTANCE_NAME, wait_for_completion=False
+ )
+ _patch_hook_get_connection(stop_db_instance.hook)
+ stop_db_instance.execute(None)
+ result =
self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
+ status = result["DBInstances"][0]["DBInstanceStatus"]
+ assert status == "stopped"
+ mock_await_status.assert_not_called()
+
+ @mock_rds
+ def test_stop_db_instance_create_snapshot(self):
+ _create_db_instance(self.hook)
+ stop_db_instance = RdsStopDbOperator(
+ task_id="test_stop_db_instance_create_snapshot",
+ db_identifier=DB_INSTANCE_NAME,
+ db_snapshot_identifier=DB_INSTANCE_SNAPSHOT,
+ )
+ _patch_hook_get_connection(stop_db_instance.hook)
+ stop_db_instance.execute(None)
+
+ describe_result =
self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
+ status = describe_result["DBInstances"][0]['DBInstanceStatus']
+ assert status == "stopped"
+
+ snapshot_result =
self.hook.conn.describe_db_snapshots(DBSnapshotIdentifier=DB_INSTANCE_SNAPSHOT)
+ instance_snapshots = snapshot_result.get("DBSnapshots")
+ assert instance_snapshots
+ assert len(instance_snapshots) == 1
+
Review Comment:
It might be worth adding a test for stopping the DB cluster with a snapshot
and use the `caplog` fixture (there are examples in other airflow tests) to
ensure your warning message is logged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]