kazanzhy commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997445302


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: 
"instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. 
(default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: 
True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = 
self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = 
self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   @eladkal I remember that implemented `_await_status` to use the single 
method for different operators. I was the best solution in my opinion at that 
time. I knew about waiters but I guess there weren't all waiters that I needed.
   
   I agree with @ferruzzi , let's use `_await_status` in this PR and then 
refactor the module and add waiters where it's possible. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to