josh-fell commented on code in PR #24099:
URL: https://github.com/apache/airflow/pull/24099#discussion_r900217194
##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -551,6 +551,106 @@ def execute(self, context: 'Context') -> str:
return json.dumps(delete_subscription, default=str)
+class RdsCreateDbInstanceOperator(RdsBaseOperator):
+ """
+ Creates an RDS DB instance
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:RdsCreateDbInstanceOperator`
+
+ :param db_instance_identifier: The DB instance identifier, must start with
a letter and
+ contain from 1 to 63 letters, numbers, or hyphens
+ :param db_instance_class: The compute and memory capacity of the DB
instance, for example db.m5.large
+ :param engine: The name of the database engine to be used for this instance
+ :param rds_kwargs: Named arguments to pass to boto3 RDS client function
``create_db_instance``
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.create_db_instance
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ :param wait_for_completion: Whether or not wait for creation of the DB
instance complete. (default: True)
+ """
+
+ def __init__(
+ self,
+ *,
+ db_instance_identifier: str,
+ db_instance_class: str,
+ engine: str,
+ rds_kwargs: Optional[Dict] = None,
+ aws_conn_id: str = "aws_default",
+ wait_for_completion: bool = True,
+ **kwargs,
+ ):
+ super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+
+ self.db_instance_identifier = db_instance_identifier
+ self.db_instance_class = db_instance_class
+ self.engine = engine
+ self.rds_kwargs = rds_kwargs or {}
+ self.wait_for_completion = wait_for_completion
+
+ def execute(self, context: 'Context') -> str:
+ self.log.info(f"Creating new DB instance
{self.db_instance_identifier}")
Review Comment:
```suggestion
self.log.info("Creating new DB instance %s",
self.db_instance_identifier)
```
In logging calls we should use %-formatting as much as possible so that the
formatting does not occur until it is absolutely needed.
##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -551,6 +551,106 @@ def execute(self, context: 'Context') -> str:
return json.dumps(delete_subscription, default=str)
+class RdsCreateDbInstanceOperator(RdsBaseOperator):
+ """
+ Creates an RDS DB instance
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:RdsCreateDbInstanceOperator`
+
+ :param db_instance_identifier: The DB instance identifier, must start with
a letter and
+ contain from 1 to 63 letters, numbers, or hyphens
+ :param db_instance_class: The compute and memory capacity of the DB
instance, for example db.m5.large
+ :param engine: The name of the database engine to be used for this instance
+ :param rds_kwargs: Named arguments to pass to boto3 RDS client function
``create_db_instance``
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.create_db_instance
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ :param wait_for_completion: Whether or not wait for creation of the DB
instance complete. (default: True)
Review Comment:
```suggestion
:param wait_for_completion: Whether or not wait for creation of the DB
instance to complete. (default: True)
```
Same comment for other instances as well.
##########
airflow/providers/amazon/aws/example_dags/example_dms.py:
##########
@@ -334,14 +309,26 @@ def clean_up():
)
# [END howto_operator_dms_delete_task]
+ delete_db_instance = RdsDeleteDbInstanceOperator(
+ task_id='delete_db_instance',
+ db_instance_identifier=RDS_INSTANCE_NAME,
+ rds_kwargs={
+ "SkipFinalSnapshot": True,
+ },
+ trigger_rule='all_done',
+ )
+
chain(
- set_up()
+ create_db_instance
+ >> create_sample_table()
+ >> create_dms_assets()
>> create_task
>> start_task
>> describe_tasks
>> await_task_start
>> stop_task
>> await_task_stop
>> delete_task
- >> clean_up()
+ >> delete_dms_assets()
+ >> delete_db_instance
)
Review Comment:
Should be:
```python
chain(
create_db_instance,
create_sample_table(),
create_dms_assets(),
create_task,
start_task,
describe_tasks,
await_task_start,
stop_task,
await_task_stop,
delete_task,
delete_dms_assets(),
delete_db_instance,
)
```
##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -551,6 +551,106 @@ def execute(self, context: 'Context') -> str:
return json.dumps(delete_subscription, default=str)
+class RdsCreateDbInstanceOperator(RdsBaseOperator):
+ """
+ Creates an RDS DB instance
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:RdsCreateDbInstanceOperator`
+
+ :param db_instance_identifier: The DB instance identifier, must start with
a letter and
+ contain from 1 to 63 letters, numbers, or hyphens
+ :param db_instance_class: The compute and memory capacity of the DB
instance, for example db.m5.large
+ :param engine: The name of the database engine to be used for this instance
+ :param rds_kwargs: Named arguments to pass to boto3 RDS client function
``create_db_instance``
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.create_db_instance
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ :param wait_for_completion: Whether or not wait for creation of the DB
instance complete. (default: True)
+ """
+
+ def __init__(
+ self,
+ *,
+ db_instance_identifier: str,
+ db_instance_class: str,
+ engine: str,
+ rds_kwargs: Optional[Dict] = None,
+ aws_conn_id: str = "aws_default",
+ wait_for_completion: bool = True,
+ **kwargs,
+ ):
+ super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+
+ self.db_instance_identifier = db_instance_identifier
+ self.db_instance_class = db_instance_class
+ self.engine = engine
+ self.rds_kwargs = rds_kwargs or {}
+ self.wait_for_completion = wait_for_completion
+
+ def execute(self, context: 'Context') -> str:
+ self.log.info(f"Creating new DB instance
{self.db_instance_identifier}")
+
+ create_db_instance = self.hook.conn.create_db_instance(
+ DBInstanceIdentifier=self.db_instance_identifier,
+ DBInstanceClass=self.db_instance_class,
+ Engine=self.engine,
+ **self.rds_kwargs,
+ )
+
+ if self.wait_for_completion:
+ self.hook.conn.get_waiter("db_instance_available").wait(
+ DBInstanceIdentifier=self.db_instance_identifier
+ )
+
+ return json.dumps(create_db_instance, default=str)
+
+
+class RdsDeleteDbInstanceOperator(RdsBaseOperator):
+ """
+ Deletes an RDS DB Instance
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:RdsDeleteDbInstanceOperator`
+
+ :param db_instance_identifier: The DB instance identifier for the DB
instance to be deleted
+ :param rds_kwargs: Named arguments to pass to boto3 RDS client function
``delete_db_instance``
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.delete_db_instance
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ :param wait_for_completion: Whether or not wait for deletion of the DB
instance complete. (default: True)
+ """
+
+ def __init__(
+ self,
+ *,
+ db_instance_identifier: str,
+ rds_kwargs: Optional[Dict] = None,
+ aws_conn_id: str = "aws_default",
+ wait_for_completion: bool = True,
+ **kwargs,
+ ):
+ super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+ self.db_instance_identifier = db_instance_identifier
+ self.rds_kwargs = rds_kwargs or {}
+ self.wait_for_completion = wait_for_completion
+
+ def execute(self, context: 'Context') -> str:
+ self.log.info(f"Deleting DB instance {self.db_instance_identifier}")
Review Comment:
```suggestion
self.log.info("Deleting DB instance %s", self.db_instance_identifier)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]