hankehly commented on code in PR #26003:
URL: https://github.com/apache/airflow/pull/26003#discussion_r962437745
##########
airflow/providers/amazon/aws/sensors/rds.py:
##########
@@ -149,7 +161,53 @@ def poke(self, context: 'Context'):
return self._check_item(item_type='export_task',
item_name=self.export_task_identifier)
+class RdsDbSensor(RdsBaseSensor):
+ """
+ Waits for an RDS instance or cluster to enter one of a number of states
+
+ .. seealso::
+ For more information on how to use this sensor, take a look at the
guide:
+ :ref:`howto/sensor:RdsDbSensor`
+
+ :param db_type: Type of the DB - either "instance" or "cluster"
+ :param db_identifier: The AWS identifier for the DB
+ :param target_statuses: Target status of DB
+ """
+
+ def __init__(
+ self,
+ *,
+ db_identifier: str,
+ db_type: str = RdsDbType.INSTANCE,
+ target_statuses: Optional[List[str]] = None,
+ aws_conn_id: str = "aws_default",
+ **kwargs,
+ ):
+ super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+ self.db_identifier = db_identifier
+ self.target_statuses = target_statuses or ["available"]
+ self.db_type = RdsDbType(db_type)
+
+ def poke(self, context: 'Context'):
+ self.log.info(
+ "Poking for statuses : %s\nfor db instance %s",
self.target_statuses, self.db_identifier
+ )
+ item_type = self._check_item_type()
+ return self._check_item(item_type=item_type,
item_name=self.db_identifier)
+
+ def _check_status_field(self) -> str:
+ if self.db_type == RdsDbType.INSTANCE:
Review Comment:
2022/09/05 update
(https://github.com/apache/airflow/pull/26003/commits/0d537b1bdf1fa3dbd8a94f1b635a5814e84e11e9)
Added `_check_status_field` and `_check_item_type` instance methods to
account for different db types.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]