vincbeck commented on code in PR #43988:
URL: https://github.com/apache/airflow/pull/43988#discussion_r1842564082


##########
providers/src/airflow/providers/amazon/aws/hooks/dms.py:
##########
@@ -219,3 +224,158 @@ def wait_for_task_status(self, replication_task_arn: str, 
status: DmsTaskWaiterS
             ],
             WithoutSettings=True,
         )
+
+    def describe_replication_configs(self, filters: list[dict] | None = None, 
**kwargs) -> list[dict]:
+        """
+        Return list of serverless replication configs.
+
+        .. seealso::
+            - 
:external+boto3:py:meth:`DatabaseMigrationService.Client.describe_replication_configs`
+
+        :param filters: List of filter objects
+        :return: List of replication tasks
+        """
+        filters = filters if filters is not None else []
+
+        try:
+            resp = self.conn.describe_replication_configs(Filters=filters, 
**kwargs)
+            return resp.get("ReplicationConfigs", [])
+        except Exception as ex:
+            self.log.error("Error while describing replication configs: %s", 
str(ex))
+            return []

Review Comment:
   Do you really want to swallow any exception and return an empty list in that 
case? We might miss some issues



##########
providers/src/airflow/providers/amazon/aws/hooks/dms.py:
##########
@@ -219,3 +224,158 @@ def wait_for_task_status(self, replication_task_arn: str, 
status: DmsTaskWaiterS
             ],
             WithoutSettings=True,
         )
+
+    def describe_replication_configs(self, filters: list[dict] | None = None, 
**kwargs) -> list[dict]:
+        """
+        Return list of serverless replication configs.
+
+        .. seealso::
+            - 
:external+boto3:py:meth:`DatabaseMigrationService.Client.describe_replication_configs`
+
+        :param filters: List of filter objects
+        :return: List of replication tasks
+        """
+        filters = filters if filters is not None else []
+
+        try:
+            resp = self.conn.describe_replication_configs(Filters=filters, 
**kwargs)
+            return resp.get("ReplicationConfigs", [])
+        except Exception as ex:
+            self.log.error("Error while describing replication configs: %s", 
str(ex))
+            return []
+
+    def create_replication_config(
+        self,
+        replication_config_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        compute_config: dict[str, Any],
+        replication_type: str,
+        table_mappings: str,
+        additional_config_kwargs: dict[str, Any] | None = None,
+        **kwargs,
+    ):
+        """
+        Create an AWS DMS Serverless configuration that can be used to start 
an DMS Serverless replication.
+
+        .. seealso::
+            - 
:external+boto3:py:meth:`DatabaseMigrationService.Client.create_replication_config`
+
+        :param replicationConfigId: Unique identifier used to create a 
ReplicationConfigArn.
+        :param sourceEndpointArn: ARN of the source endpoint
+        :param targetEndpointArn: ARN of the target endpoint
+        :param computeConfig: Parameters for provisioning an DMS Serverless 
replication.
+        :param replicationType: type of DMS Serverless replication
+        :param tableMappings: JSON table mappings
+        :param tags: Key-value tag pairs
+        :param resourceId: Unique value or name that you set for a given 
resource that can be used to construct an Amazon Resource Name (ARN) for that 
resource.
+        :param supplementalSettings: JSON settings for specifying supplemental 
data
+        :param replicationSettings: JSON settings for DMS Serverless 
replications
+
+        :return: ReplicationConfigArn
+
+        """
+        if additional_config_kwargs is None:
+            additional_config_kwargs = {}
+        try:
+            resp = self.conn.create_replication_config(
+                ReplicationConfigIdentifier=replication_config_id,
+                SourceEndpointArn=source_endpoint_arn,
+                TargetEndpointArn=target_endpoint_arn,
+                ComputeConfig=compute_config,
+                ReplicationType=replication_type,
+                TableMappings=table_mappings,
+                **additional_config_kwargs,
+            )
+            arn = resp.get("ReplicationConfig", {}).get("ReplicationConfigArn")
+            self.log.info("Successfully created replication config: %s", arn)
+            return arn
+
+        except ClientError as err:
+            err_str = f"Error: {err.get('Error','').get('Code','')}: 
{err.get('Error','').get('Message','')}"
+            self.log.error("Error while creating replication config: %s", 
err_str)
+            raise err
+
+    def describe_replications(self, filters: list[dict[str, Any]] | None = 
None, **kwargs) -> list[dict]:
+        """
+        Return list of serverless replications.
+
+        .. seealso::
+            - 
:external+boto3:py:meth:`DatabaseMigrationService.Client.describe_replications`
+
+        :param filters: List of filter objects
+        :return: List of replications
+        """
+        filters = filters if filters is not None else []
+        try:
+            resp = self.conn.describe_replications(Filters=filters, **kwargs)
+            return resp.get("Replications", [])
+        except Exception:
+            return []

Review Comment:
   Same



##########
providers/src/airflow/providers/amazon/aws/hooks/dms.py:
##########
@@ -219,3 +224,158 @@ def wait_for_task_status(self, replication_task_arn: str, 
status: DmsTaskWaiterS
             ],
             WithoutSettings=True,
         )
+

Review Comment:
   Overall I am wondering if it is worth creating all these methods, they are 
all just wrapper around boto3 apis. You added some error management around it 
but I am just questioning if it is worth creating new methods for that. 



##########
providers/src/airflow/providers/amazon/aws/operators/dms.py:
##########
@@ -277,3 +288,551 @@ def execute(self, context: Context):
         """Stop AWS DMS replication task from Airflow."""
         
self.hook.stop_replication_task(replication_task_arn=self.replication_task_arn)
         self.log.info("DMS replication task(%s) is stopping.", 
self.replication_task_arn)
+
+
+class DmsDescribeReplicationConfigsOperator(AwsBaseOperator[DmsHook]):
+    """
+    Describes AWS DMS Serverless replication configurations.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDescribeReplicationConfigsOperator`
+
+    :param describe_config_filter: Filters block for filtering results.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields("filter")
+    template_fields_renderers = {"filter": "json"}
+
+    def __init__(
+        self,
+        *,
+        filter: list[dict] | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.filter = filter
+
+    def execute(self, context: Context) -> list:
+        """
+        Describe AWS DMS replication configurations.
+
+        :return: List of replication configurations
+        """
+        return self.hook.describe_replication_configs(filters=self.filter)
+
+
+class DmsCreateReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+    """
+
+    Creates an AWS DMS Serverless replication configuration.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsCreateReplicationConfigOperator`
+
+    :param replication_config_id: Unique identifier used to create a 
ReplicationConfigArn.
+    :param source_endpoint_arn: ARN of the source endpoint
+    :param target_endpoint_arn: ARN of the target endpoint
+    :param compute_config: Parameters for provisioning an DMS Serverless 
replication.
+    :param replication_type: type of DMS Serverless replication
+    :param table_mappings: JSON table mappings
+    :param tags: Key-value tag pairs
+    :param additional_config_kwargs: Additional configuration parameters for 
DMS Serverless replication. Passed directly to the API
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields(
+        "replication_config_id",
+        "source_endpoint_arn",
+        "target_endpoint_arn",
+        "compute_config",
+        "replication_type",
+        "table_mappings",
+    )
+
+    template_fields_renderers = {"compute_config": "json", "tableMappings": 
"json"}
+
+    def __init__(
+        self,
+        *,
+        replication_config_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        compute_config: dict[str, Any],
+        replication_type: str,
+        table_mappings: str,
+        additional_config_kwargs: dict | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_id = replication_config_id
+        self.source_endpoint_arn = source_endpoint_arn
+        self.target_endpoint_arn = target_endpoint_arn
+        self.compute_config = compute_config
+        self.replication_type = replication_type
+        self.table_mappings = table_mappings
+        self.additional_config_kwargs = additional_config_kwargs or {}
+
+    def execute(self, context: Context) -> str:
+        resp = self.hook.create_replication_config(
+            replication_config_id=self.replication_config_id,
+            source_endpoint_arn=self.source_endpoint_arn,
+            target_endpoint_arn=self.target_endpoint_arn,
+            compute_config=self.compute_config,
+            replication_type=self.replication_type,
+            table_mappings=self.table_mappings,
+            additional_config_kwargs=self.additional_config_kwargs,
+        )
+
+        self.log.info("DMS replication config(%s) has been created.", 
self.replication_config_id)
+        return resp
+
+
+class DmsDeleteReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+    """
+
+    Deletes an AWS DMS Serverless replication configuration.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDeleteReplicationConfigOperator`
+
+    :param replication_config_arn: ARN of the replication config
+    :param wait_for_completion: If True, waits for the replication config to 
be deleted before returning.
+        If False, the operator will return immediately after the request is 
made.
+    :param deferrable: Run the operator in deferrable mode.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = 
aws_template_fields("replication_config_arn")
+
+    VALID_STATES = ["failed", "stopped", "created"]
+    DELETING_STATES = ["deleting"]
+    TERMINAL_PROVISION_STATES = ["deprovisioned", ""]
+
+    def __init__(
+        self,
+        *,
+        replication_config_arn: str,
+        wait_for_completion: bool = True,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        waiter_delay: int = 5,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_arn = replication_config_arn
+        self.wait_for_completion = wait_for_completion
+        self.deferrable = deferrable
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+
+    def execute(self, context: Context) -> None:
+        results = self.hook.describe_replications(
+            filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}]
+        )
+
+        if len(results) > 0:
+            current_state = results[0].get("Status", "")
+            self.log.info(
+                "Current state of replication config(%s) is %s.", 
self.replication_config_arn, current_state
+            )
+            # replication must be deprovisioned before deleting
+            provision_status = self.hook.get_provision_status(
+                replication_config_arn=self.replication_config_arn
+            )
+
+            if (
+                current_state.lower() in self.VALID_STATES
+                and provision_status in self.TERMINAL_PROVISION_STATES
+            ):
+                self.log.info("DMS replication config(%s) is in valid state.", 
self.replication_config_arn)
+
+                self.hook.delete_replication_config(
+                    replication_config_arn=self.replication_config_arn,
+                    delay=self.waiter_delay,
+                    max_attempts=self.waiter_max_attempts,
+                )
+                self.handle_delete_wait()

Review Comment:
   In order to simplify things I think you can remove that block of code. The 
`else` statement would also work if the replication config is in valid state



##########
providers/src/airflow/providers/amazon/aws/operators/dms.py:
##########
@@ -277,3 +288,551 @@ def execute(self, context: Context):
         """Stop AWS DMS replication task from Airflow."""
         
self.hook.stop_replication_task(replication_task_arn=self.replication_task_arn)
         self.log.info("DMS replication task(%s) is stopping.", 
self.replication_task_arn)
+
+
+class DmsDescribeReplicationConfigsOperator(AwsBaseOperator[DmsHook]):
+    """
+    Describes AWS DMS Serverless replication configurations.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDescribeReplicationConfigsOperator`
+
+    :param describe_config_filter: Filters block for filtering results.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields("filter")
+    template_fields_renderers = {"filter": "json"}
+
+    def __init__(
+        self,
+        *,
+        filter: list[dict] | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.filter = filter
+
+    def execute(self, context: Context) -> list:
+        """
+        Describe AWS DMS replication configurations.
+
+        :return: List of replication configurations
+        """
+        return self.hook.describe_replication_configs(filters=self.filter)
+
+
+class DmsCreateReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+    """
+
+    Creates an AWS DMS Serverless replication configuration.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsCreateReplicationConfigOperator`
+
+    :param replication_config_id: Unique identifier used to create a 
ReplicationConfigArn.
+    :param source_endpoint_arn: ARN of the source endpoint
+    :param target_endpoint_arn: ARN of the target endpoint
+    :param compute_config: Parameters for provisioning an DMS Serverless 
replication.
+    :param replication_type: type of DMS Serverless replication
+    :param table_mappings: JSON table mappings
+    :param tags: Key-value tag pairs
+    :param additional_config_kwargs: Additional configuration parameters for 
DMS Serverless replication. Passed directly to the API
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields(
+        "replication_config_id",
+        "source_endpoint_arn",
+        "target_endpoint_arn",
+        "compute_config",
+        "replication_type",
+        "table_mappings",
+    )
+
+    template_fields_renderers = {"compute_config": "json", "tableMappings": 
"json"}
+
+    def __init__(
+        self,
+        *,
+        replication_config_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        compute_config: dict[str, Any],
+        replication_type: str,
+        table_mappings: str,
+        additional_config_kwargs: dict | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_id = replication_config_id
+        self.source_endpoint_arn = source_endpoint_arn
+        self.target_endpoint_arn = target_endpoint_arn
+        self.compute_config = compute_config
+        self.replication_type = replication_type
+        self.table_mappings = table_mappings
+        self.additional_config_kwargs = additional_config_kwargs or {}
+
+    def execute(self, context: Context) -> str:
+        resp = self.hook.create_replication_config(
+            replication_config_id=self.replication_config_id,
+            source_endpoint_arn=self.source_endpoint_arn,
+            target_endpoint_arn=self.target_endpoint_arn,
+            compute_config=self.compute_config,
+            replication_type=self.replication_type,
+            table_mappings=self.table_mappings,
+            additional_config_kwargs=self.additional_config_kwargs,
+        )
+
+        self.log.info("DMS replication config(%s) has been created.", 
self.replication_config_id)
+        return resp
+
+
+class DmsDeleteReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+    """
+
+    Deletes an AWS DMS Serverless replication configuration.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDeleteReplicationConfigOperator`
+
+    :param replication_config_arn: ARN of the replication config
+    :param wait_for_completion: If True, waits for the replication config to 
be deleted before returning.
+        If False, the operator will return immediately after the request is 
made.
+    :param deferrable: Run the operator in deferrable mode.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = 
aws_template_fields("replication_config_arn")
+
+    VALID_STATES = ["failed", "stopped", "created"]
+    DELETING_STATES = ["deleting"]
+    TERMINAL_PROVISION_STATES = ["deprovisioned", ""]
+
+    def __init__(
+        self,
+        *,
+        replication_config_arn: str,
+        wait_for_completion: bool = True,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        waiter_delay: int = 5,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_arn = replication_config_arn
+        self.wait_for_completion = wait_for_completion
+        self.deferrable = deferrable
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+
+    def execute(self, context: Context) -> None:
+        results = self.hook.describe_replications(
+            filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}]
+        )
+
+        if len(results) > 0:
+            current_state = results[0].get("Status", "")
+            self.log.info(
+                "Current state of replication config(%s) is %s.", 
self.replication_config_arn, current_state
+            )
+            # replication must be deprovisioned before deleting
+            provision_status = self.hook.get_provision_status(
+                replication_config_arn=self.replication_config_arn
+            )
+
+            if (
+                current_state.lower() in self.VALID_STATES
+                and provision_status in self.TERMINAL_PROVISION_STATES
+            ):
+                self.log.info("DMS replication config(%s) is in valid state.", 
self.replication_config_arn)
+
+                self.hook.delete_replication_config(
+                    replication_config_arn=self.replication_config_arn,
+                    delay=self.waiter_delay,
+                    max_attempts=self.waiter_max_attempts,
+                )
+                self.handle_delete_wait()
+
+            else:
+                # Must be in a terminal state to delete
+                self.log.info(
+                    "DMS replication config(%s) cannot be deleted until 
replication is in terminal state and deprovisioned. Waiting for terminal 
state.",
+                    self.replication_config_arn,
+                )
+                if self.deferrable:
+                    if current_state.lower() not in self.VALID_STATES:
+                        self.log.info("Deferring until terminal status 
reached.")
+                        self.defer(
+                            trigger=DmsReplicationTerminalStatusTrigger(
+                                
replication_config_arn=self.replication_config_arn,
+                                waiter_delay=self.waiter_delay,
+                                waiter_max_attempts=self.waiter_max_attempts,
+                                aws_conn_id=self.aws_conn_id,
+                            ),
+                            method_name="retry_execution",
+                        )
+                    if provision_status not in self.TERMINAL_PROVISION_STATES: 
 # not deprovisioned:
+                        self.log.info("Deferring until deprovisioning 
completes.")
+                        self.defer(
+                            trigger=DmsReplicationDeprovisionedTrigger(
+                                
replication_config_arn=self.replication_config_arn,
+                                waiter_delay=self.waiter_delay,
+                                waiter_max_attempts=self.waiter_max_attempts,
+                                aws_conn_id=self.aws_conn_id,
+                            ),
+                            method_name="retry_execution",
+                        )
+
+                else:
+                    self.hook.get_waiter("replication_terminal_status").wait(
+                        Filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}],
+                        WaiterConfig={"Delay": self.waiter_delay, 
"MaxAttempts": self.waiter_max_attempts},
+                    )
+                    self.hook.get_waiter("replication_deprovisioned").wait(
+                        Filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}],
+                        WaiterConfig={"Delay": self.waiter_delay, 
"MaxAttempts": self.waiter_max_attempts},
+                    )
+                    
self.hook.delete_replication_config(self.replication_config_arn)
+                    self.handle_delete_wait()
+
+        else:
+            self.log.info("DMS replication config(%s) does not exist.", 
self.replication_config_arn)
+
+    def handle_delete_wait(self):
+        if self.deferrable:
+            self.log.info("Deferring until replication config is deleted.")
+            self.defer(
+                trigger=DmsReplicationConfigDeletedTrigger(
+                    replication_config_arn=self.replication_config_arn,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    aws_conn_id=self.aws_conn_id,
+                ),
+                method_name="execute_complete",
+            )
+
+        if self.wait_for_completion:
+            self.hook.get_waiter("replication_config_deleted").wait(
+                Filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}],
+                WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts": 
self.waiter_max_attempts},
+            )
+            self.log.info("DMS replication config(%s) deleted.", 
self.replication_config_arn)
+
+    def execute_complete(self, context, event=None):
+        self.replication_config_arn = event.get("replication_config_arn")
+        self.log.info("DMS replication config(%s) deleted.", 
self.replication_config_arn)
+
+    def retry_execution(self, context, event=None):
+        self.replication_config_arn = event.get("replication_config_arn")
+        self.log.info("Retrying replication config(%s) deletion.", 
self.replication_config_arn)
+        self.execute(context)
+
+
+class DmsDescribeReplicationsOperator(AwsBaseOperator[DmsHook]):
+    """
+    Describes AWS DMS Serverless replications.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDescribeReplicationsOperator`
+
+    :param filter: Filters block for filtering results.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields("filter")
+    template_fields_renderer = {"filter": "json"}
+
+    def __init__(
+        self,
+        *,
+        filter: list[dict[str, Any]] | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.filter = filter
+
+    def execute(self, context: Context) -> list[dict[str, Any]]:
+        """
+        Describe AWS DMS replications.
+
+        :return: Replications
+        """
+        return self.hook.describe_replications(self.filter)
+
+
+class DmsStartReplicationOperator(AwsBaseOperator[DmsHook]):
+    """
+    Starts an AWS DMS Serverless replication.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsStartReplicationOperator`
+
+    :param replication_config_arn: ARN of the replication config
+    :param replication_start_type: Type of replication.
+    :param cdc_start_time: Start time of CDC
+    :param cdc_start_pos: Indicates when to start CDC.
+    :param cdc_stop_pos: Indicates when to stop CDC.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    RUNNING_STATES = ["running"]
+    STARTABLE_STATES = ["stopped", "failed", "created"]
+    TERMINAL_STATES = ["failed", "stopped", "created"]
+    TERMINAL_PROVISION_STATES = ["deprovisioned", ""]
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields(
+        "replication_config_arn", "replication_start_type", "cdc_start_time", 
"cdc_start_pos", "cdc_stop_pos"
+    )
+
+    def __init__(
+        self,
+        *,
+        replication_config_arn: str,
+        replication_start_type: str,
+        cdc_start_time: datetime | str | None = None,
+        cdc_start_pos: str | None = None,
+        cdc_stop_pos: str | None = None,
+        wait_for_completion: bool = True,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_arn = replication_config_arn
+        self.replication_start_type = replication_start_type
+        self.cdc_start_time = cdc_start_time
+        self.cdc_start_pos = cdc_start_pos
+        self.cdc_stop_pos = cdc_stop_pos
+        self.deferrable = deferrable
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+        self.wait_for_completion = wait_for_completion
+
+        if self.cdc_start_time and self.cdc_start_pos:
+            raise AirflowException("Only one of cdc_start_time or 
cdc_start_pos should be provided.")
+
+    def execute(self, context: Context):
+        result = self.hook.describe_replications(
+            filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}]
+        )
+
+        try:
+            current_status = result[0].get("Status", "")
+        except Exception as ex:
+            self.log.error("Error while getting replication status: %s. Unable 
to start replication", str(ex))
+            raise ex

Review Comment:
   Same here, I dont think it is necessary



##########
providers/src/airflow/providers/amazon/aws/operators/dms.py:
##########
@@ -277,3 +288,551 @@ def execute(self, context: Context):
         """Stop AWS DMS replication task from Airflow."""
         
self.hook.stop_replication_task(replication_task_arn=self.replication_task_arn)
         self.log.info("DMS replication task(%s) is stopping.", 
self.replication_task_arn)
+
+
+class DmsDescribeReplicationConfigsOperator(AwsBaseOperator[DmsHook]):
+    """
+    Describes AWS DMS Serverless replication configurations.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDescribeReplicationConfigsOperator`
+
+    :param describe_config_filter: Filters block for filtering results.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields("filter")
+    template_fields_renderers = {"filter": "json"}
+
+    def __init__(
+        self,
+        *,
+        filter: list[dict] | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.filter = filter
+
+    def execute(self, context: Context) -> list:
+        """
+        Describe AWS DMS replication configurations.
+
+        :return: List of replication configurations
+        """
+        return self.hook.describe_replication_configs(filters=self.filter)
+
+
+class DmsCreateReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+    """
+
+    Creates an AWS DMS Serverless replication configuration.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsCreateReplicationConfigOperator`
+
+    :param replication_config_id: Unique identifier used to create a 
ReplicationConfigArn.
+    :param source_endpoint_arn: ARN of the source endpoint
+    :param target_endpoint_arn: ARN of the target endpoint
+    :param compute_config: Parameters for provisioning an DMS Serverless 
replication.
+    :param replication_type: type of DMS Serverless replication
+    :param table_mappings: JSON table mappings
+    :param tags: Key-value tag pairs
+    :param additional_config_kwargs: Additional configuration parameters for 
DMS Serverless replication. Passed directly to the API
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields(
+        "replication_config_id",
+        "source_endpoint_arn",
+        "target_endpoint_arn",
+        "compute_config",
+        "replication_type",
+        "table_mappings",
+    )
+
+    template_fields_renderers = {"compute_config": "json", "tableMappings": 
"json"}
+
+    def __init__(
+        self,
+        *,
+        replication_config_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        compute_config: dict[str, Any],
+        replication_type: str,
+        table_mappings: str,
+        additional_config_kwargs: dict | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_id = replication_config_id
+        self.source_endpoint_arn = source_endpoint_arn
+        self.target_endpoint_arn = target_endpoint_arn
+        self.compute_config = compute_config
+        self.replication_type = replication_type
+        self.table_mappings = table_mappings
+        self.additional_config_kwargs = additional_config_kwargs or {}
+
+    def execute(self, context: Context) -> str:
+        resp = self.hook.create_replication_config(
+            replication_config_id=self.replication_config_id,
+            source_endpoint_arn=self.source_endpoint_arn,
+            target_endpoint_arn=self.target_endpoint_arn,
+            compute_config=self.compute_config,
+            replication_type=self.replication_type,
+            table_mappings=self.table_mappings,
+            additional_config_kwargs=self.additional_config_kwargs,
+        )
+
+        self.log.info("DMS replication config(%s) has been created.", 
self.replication_config_id)
+        return resp
+
+
+class DmsDeleteReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+    """
+
+    Deletes an AWS DMS Serverless replication configuration.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDeleteReplicationConfigOperator`
+
+    :param replication_config_arn: ARN of the replication config
+    :param wait_for_completion: If True, waits for the replication config to 
be deleted before returning.
+        If False, the operator will return immediately after the request is 
made.
+    :param deferrable: Run the operator in deferrable mode.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = 
aws_template_fields("replication_config_arn")
+
+    VALID_STATES = ["failed", "stopped", "created"]
+    DELETING_STATES = ["deleting"]
+    TERMINAL_PROVISION_STATES = ["deprovisioned", ""]
+
+    def __init__(
+        self,
+        *,
+        replication_config_arn: str,
+        wait_for_completion: bool = True,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        waiter_delay: int = 5,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_arn = replication_config_arn
+        self.wait_for_completion = wait_for_completion
+        self.deferrable = deferrable
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+
+    def execute(self, context: Context) -> None:
+        results = self.hook.describe_replications(
+            filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}]
+        )
+
+        if len(results) > 0:
+            current_state = results[0].get("Status", "")
+            self.log.info(
+                "Current state of replication config(%s) is %s.", 
self.replication_config_arn, current_state
+            )
+            # replication must be deprovisioned before deleting
+            provision_status = self.hook.get_provision_status(
+                replication_config_arn=self.replication_config_arn
+            )
+
+            if (
+                current_state.lower() in self.VALID_STATES
+                and provision_status in self.TERMINAL_PROVISION_STATES
+            ):
+                self.log.info("DMS replication config(%s) is in valid state.", 
self.replication_config_arn)
+
+                self.hook.delete_replication_config(
+                    replication_config_arn=self.replication_config_arn,
+                    delay=self.waiter_delay,
+                    max_attempts=self.waiter_max_attempts,
+                )
+                self.handle_delete_wait()
+
+            else:
+                # Must be in a terminal state to delete
+                self.log.info(
+                    "DMS replication config(%s) cannot be deleted until 
replication is in terminal state and deprovisioned. Waiting for terminal 
state.",
+                    self.replication_config_arn,
+                )
+                if self.deferrable:
+                    if current_state.lower() not in self.VALID_STATES:
+                        self.log.info("Deferring until terminal status 
reached.")
+                        self.defer(
+                            trigger=DmsReplicationTerminalStatusTrigger(
+                                
replication_config_arn=self.replication_config_arn,
+                                waiter_delay=self.waiter_delay,
+                                waiter_max_attempts=self.waiter_max_attempts,
+                                aws_conn_id=self.aws_conn_id,
+                            ),
+                            method_name="retry_execution",
+                        )
+                    if provision_status not in self.TERMINAL_PROVISION_STATES: 
 # not deprovisioned:
+                        self.log.info("Deferring until deprovisioning 
completes.")
+                        self.defer(
+                            trigger=DmsReplicationDeprovisionedTrigger(
+                                
replication_config_arn=self.replication_config_arn,
+                                waiter_delay=self.waiter_delay,
+                                waiter_max_attempts=self.waiter_max_attempts,
+                                aws_conn_id=self.aws_conn_id,
+                            ),
+                            method_name="retry_execution",
+                        )
+
+                else:
+                    self.hook.get_waiter("replication_terminal_status").wait(
+                        Filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}],
+                        WaiterConfig={"Delay": self.waiter_delay, 
"MaxAttempts": self.waiter_max_attempts},
+                    )
+                    self.hook.get_waiter("replication_deprovisioned").wait(
+                        Filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}],
+                        WaiterConfig={"Delay": self.waiter_delay, 
"MaxAttempts": self.waiter_max_attempts},
+                    )
+                    
self.hook.delete_replication_config(self.replication_config_arn)
+                    self.handle_delete_wait()
+
+        else:
+            self.log.info("DMS replication config(%s) does not exist.", 
self.replication_config_arn)
+
+    def handle_delete_wait(self):
+        if self.deferrable:
+            self.log.info("Deferring until replication config is deleted.")
+            self.defer(
+                trigger=DmsReplicationConfigDeletedTrigger(
+                    replication_config_arn=self.replication_config_arn,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    aws_conn_id=self.aws_conn_id,
+                ),
+                method_name="execute_complete",
+            )
+
+        if self.wait_for_completion:
+            self.hook.get_waiter("replication_config_deleted").wait(
+                Filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}],
+                WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts": 
self.waiter_max_attempts},
+            )
+            self.log.info("DMS replication config(%s) deleted.", 
self.replication_config_arn)
+
+    def execute_complete(self, context, event=None):
+        self.replication_config_arn = event.get("replication_config_arn")
+        self.log.info("DMS replication config(%s) deleted.", 
self.replication_config_arn)
+
+    def retry_execution(self, context, event=None):
+        self.replication_config_arn = event.get("replication_config_arn")
+        self.log.info("Retrying replication config(%s) deletion.", 
self.replication_config_arn)
+        self.execute(context)
+
+
+class DmsDescribeReplicationsOperator(AwsBaseOperator[DmsHook]):
+    """
+    Describes AWS DMS Serverless replications.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDescribeReplicationsOperator`
+
+    :param filter: Filters block for filtering results.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields("filter")
+    template_fields_renderer = {"filter": "json"}
+
+    def __init__(
+        self,
+        *,
+        filter: list[dict[str, Any]] | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.filter = filter
+
+    def execute(self, context: Context) -> list[dict[str, Any]]:
+        """
+        Describe AWS DMS replications.
+
+        :return: Replications
+        """
+        return self.hook.describe_replications(self.filter)
+
+
+class DmsStartReplicationOperator(AwsBaseOperator[DmsHook]):
+    """
+    Starts an AWS DMS Serverless replication.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsStartReplicationOperator`
+
+    :param replication_config_arn: ARN of the replication config
+    :param replication_start_type: Type of replication.
+    :param cdc_start_time: Start time of CDC
+    :param cdc_start_pos: Indicates when to start CDC.
+    :param cdc_stop_pos: Indicates when to stop CDC.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    RUNNING_STATES = ["running"]
+    STARTABLE_STATES = ["stopped", "failed", "created"]
+    TERMINAL_STATES = ["failed", "stopped", "created"]
+    TERMINAL_PROVISION_STATES = ["deprovisioned", ""]
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields(
+        "replication_config_arn", "replication_start_type", "cdc_start_time", 
"cdc_start_pos", "cdc_stop_pos"
+    )
+
+    def __init__(
+        self,
+        *,
+        replication_config_arn: str,
+        replication_start_type: str,
+        cdc_start_time: datetime | str | None = None,
+        cdc_start_pos: str | None = None,
+        cdc_stop_pos: str | None = None,
+        wait_for_completion: bool = True,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_arn = replication_config_arn
+        self.replication_start_type = replication_start_type
+        self.cdc_start_time = cdc_start_time
+        self.cdc_start_pos = cdc_start_pos
+        self.cdc_stop_pos = cdc_stop_pos
+        self.deferrable = deferrable
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+        self.wait_for_completion = wait_for_completion
+
+        if self.cdc_start_time and self.cdc_start_pos:
+            raise AirflowException("Only one of cdc_start_time or 
cdc_start_pos should be provided.")
+
+    def execute(self, context: Context):
+        result = self.hook.describe_replications(
+            filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}]
+        )
+
+        try:
+            current_status = result[0].get("Status", "")
+        except Exception as ex:
+            self.log.error("Error while getting replication status: %s. Unable 
to start replication", str(ex))
+            raise ex
+
+        provision_status = 
self.hook.get_provision_status(replication_config_arn=self.replication_config_arn)
+
+        if provision_status == "deprovisioning":
+            # wait for deprovisioning to complete before start/restart
+            self.log.info(
+                "Replication is deprovisioning. Must wait for deprovisioning 
before running replication"
+            )
+            if self.deferrable:
+                self.log.info("Deferring until deprovisioning completes.")
+                self.defer(
+                    trigger=DmsReplicationDeprovisionedTrigger(
+                        replication_config_arn=self.replication_config_arn,
+                        waiter_delay=self.waiter_delay,
+                        waiter_max_attempts=self.waiter_max_attempts,
+                        aws_conn_id=self.aws_conn_id,
+                    ),
+                    method_name="retry_execution",
+                )

Review Comment:
   This comment also apply for the above operator. I am wondering if you should 
not split this operator in two. One sensor would wait for the deprovisioning to 
complete, and this operator would fail if the status is not complete before 
actually doing it. I just think this operator does a lot and the user does not 
necessarily know that the operator could potentially wait for some time (with a 
cost) even though `wait_for_completion` is `False`



##########
providers/src/airflow/providers/amazon/aws/operators/dms.py:
##########
@@ -277,3 +288,551 @@ def execute(self, context: Context):
         """Stop AWS DMS replication task from Airflow."""
         
self.hook.stop_replication_task(replication_task_arn=self.replication_task_arn)
         self.log.info("DMS replication task(%s) is stopping.", 
self.replication_task_arn)
+
+
+class DmsDescribeReplicationConfigsOperator(AwsBaseOperator[DmsHook]):
+    """
+    Describes AWS DMS Serverless replication configurations.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDescribeReplicationConfigsOperator`
+
+    :param describe_config_filter: Filters block for filtering results.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields("filter")
+    template_fields_renderers = {"filter": "json"}
+
+    def __init__(
+        self,
+        *,
+        filter: list[dict] | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.filter = filter
+
+    def execute(self, context: Context) -> list:
+        """
+        Describe AWS DMS replication configurations.
+
+        :return: List of replication configurations
+        """
+        return self.hook.describe_replication_configs(filters=self.filter)
+
+
+class DmsCreateReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+    """
+
+    Creates an AWS DMS Serverless replication configuration.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsCreateReplicationConfigOperator`
+
+    :param replication_config_id: Unique identifier used to create a 
ReplicationConfigArn.
+    :param source_endpoint_arn: ARN of the source endpoint
+    :param target_endpoint_arn: ARN of the target endpoint
+    :param compute_config: Parameters for provisioning an DMS Serverless 
replication.
+    :param replication_type: type of DMS Serverless replication
+    :param table_mappings: JSON table mappings
+    :param tags: Key-value tag pairs
+    :param additional_config_kwargs: Additional configuration parameters for 
DMS Serverless replication. Passed directly to the API
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields(
+        "replication_config_id",
+        "source_endpoint_arn",
+        "target_endpoint_arn",
+        "compute_config",
+        "replication_type",
+        "table_mappings",
+    )
+
+    template_fields_renderers = {"compute_config": "json", "tableMappings": 
"json"}
+
+    def __init__(
+        self,
+        *,
+        replication_config_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        compute_config: dict[str, Any],
+        replication_type: str,
+        table_mappings: str,
+        additional_config_kwargs: dict | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_id = replication_config_id
+        self.source_endpoint_arn = source_endpoint_arn
+        self.target_endpoint_arn = target_endpoint_arn
+        self.compute_config = compute_config
+        self.replication_type = replication_type
+        self.table_mappings = table_mappings
+        self.additional_config_kwargs = additional_config_kwargs or {}
+
+    def execute(self, context: Context) -> str:
+        resp = self.hook.create_replication_config(
+            replication_config_id=self.replication_config_id,
+            source_endpoint_arn=self.source_endpoint_arn,
+            target_endpoint_arn=self.target_endpoint_arn,
+            compute_config=self.compute_config,
+            replication_type=self.replication_type,
+            table_mappings=self.table_mappings,
+            additional_config_kwargs=self.additional_config_kwargs,
+        )
+
+        self.log.info("DMS replication config(%s) has been created.", 
self.replication_config_id)
+        return resp
+
+
+class DmsDeleteReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+    """
+
+    Deletes an AWS DMS Serverless replication configuration.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDeleteReplicationConfigOperator`
+
+    :param replication_config_arn: ARN of the replication config
+    :param wait_for_completion: If True, waits for the replication config to 
be deleted before returning.
+        If False, the operator will return immediately after the request is 
made.
+    :param deferrable: Run the operator in deferrable mode.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = 
aws_template_fields("replication_config_arn")
+
+    VALID_STATES = ["failed", "stopped", "created"]
+    DELETING_STATES = ["deleting"]
+    TERMINAL_PROVISION_STATES = ["deprovisioned", ""]
+
+    def __init__(
+        self,
+        *,
+        replication_config_arn: str,
+        wait_for_completion: bool = True,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        waiter_delay: int = 5,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_arn = replication_config_arn
+        self.wait_for_completion = wait_for_completion
+        self.deferrable = deferrable
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+
+    def execute(self, context: Context) -> None:
+        results = self.hook.describe_replications(
+            filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}]
+        )
+
+        if len(results) > 0:

Review Comment:
   Do we need this? I expect `self.hook.get_provision_status` to fail if the 
replication does not exist? Which would make sense in that scenario



##########
providers/src/airflow/providers/amazon/aws/operators/dms.py:
##########
@@ -277,3 +288,551 @@ def execute(self, context: Context):
         """Stop AWS DMS replication task from Airflow."""
         
self.hook.stop_replication_task(replication_task_arn=self.replication_task_arn)
         self.log.info("DMS replication task(%s) is stopping.", 
self.replication_task_arn)
+
+
+class DmsDescribeReplicationConfigsOperator(AwsBaseOperator[DmsHook]):
+    """
+    Describes AWS DMS Serverless replication configurations.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDescribeReplicationConfigsOperator`
+
+    :param describe_config_filter: Filters block for filtering results.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields("filter")
+    template_fields_renderers = {"filter": "json"}
+
+    def __init__(
+        self,
+        *,
+        filter: list[dict] | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.filter = filter
+
+    def execute(self, context: Context) -> list:
+        """
+        Describe AWS DMS replication configurations.
+
+        :return: List of replication configurations
+        """
+        return self.hook.describe_replication_configs(filters=self.filter)
+
+
+class DmsCreateReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+    """
+
+    Creates an AWS DMS Serverless replication configuration.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsCreateReplicationConfigOperator`
+
+    :param replication_config_id: Unique identifier used to create a 
ReplicationConfigArn.
+    :param source_endpoint_arn: ARN of the source endpoint
+    :param target_endpoint_arn: ARN of the target endpoint
+    :param compute_config: Parameters for provisioning an DMS Serverless 
replication.
+    :param replication_type: type of DMS Serverless replication
+    :param table_mappings: JSON table mappings
+    :param tags: Key-value tag pairs
+    :param additional_config_kwargs: Additional configuration parameters for 
DMS Serverless replication. Passed directly to the API
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = aws_template_fields(
+        "replication_config_id",
+        "source_endpoint_arn",
+        "target_endpoint_arn",
+        "compute_config",
+        "replication_type",
+        "table_mappings",
+    )
+
+    template_fields_renderers = {"compute_config": "json", "tableMappings": 
"json"}
+
+    def __init__(
+        self,
+        *,
+        replication_config_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        compute_config: dict[str, Any],
+        replication_type: str,
+        table_mappings: str,
+        additional_config_kwargs: dict | None = None,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_id = replication_config_id
+        self.source_endpoint_arn = source_endpoint_arn
+        self.target_endpoint_arn = target_endpoint_arn
+        self.compute_config = compute_config
+        self.replication_type = replication_type
+        self.table_mappings = table_mappings
+        self.additional_config_kwargs = additional_config_kwargs or {}
+
+    def execute(self, context: Context) -> str:
+        resp = self.hook.create_replication_config(
+            replication_config_id=self.replication_config_id,
+            source_endpoint_arn=self.source_endpoint_arn,
+            target_endpoint_arn=self.target_endpoint_arn,
+            compute_config=self.compute_config,
+            replication_type=self.replication_type,
+            table_mappings=self.table_mappings,
+            additional_config_kwargs=self.additional_config_kwargs,
+        )
+
+        self.log.info("DMS replication config(%s) has been created.", 
self.replication_config_id)
+        return resp
+
+
+class DmsDeleteReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+    """
+
+    Deletes an AWS DMS Serverless replication configuration.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DmsDeleteReplicationConfigOperator`
+
+    :param replication_config_arn: ARN of the replication config
+    :param wait_for_completion: If True, waits for the replication config to 
be deleted before returning.
+        If False, the operator will return immediately after the request is 
made.
+    :param deferrable: Run the operator in deferrable mode.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+    """
+
+    aws_hook_class = DmsHook
+    template_fields: Sequence[str] = 
aws_template_fields("replication_config_arn")
+
+    VALID_STATES = ["failed", "stopped", "created"]
+    DELETING_STATES = ["deleting"]
+    TERMINAL_PROVISION_STATES = ["deprovisioned", ""]
+
+    def __init__(
+        self,
+        *,
+        replication_config_arn: str,
+        wait_for_completion: bool = True,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        waiter_delay: int = 5,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+        self.replication_config_arn = replication_config_arn
+        self.wait_for_completion = wait_for_completion
+        self.deferrable = deferrable
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+
+    def execute(self, context: Context) -> None:
+        results = self.hook.describe_replications(
+            filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}]
+        )
+
+        if len(results) > 0:
+            current_state = results[0].get("Status", "")
+            self.log.info(
+                "Current state of replication config(%s) is %s.", 
self.replication_config_arn, current_state
+            )
+            # replication must be deprovisioned before deleting
+            provision_status = self.hook.get_provision_status(
+                replication_config_arn=self.replication_config_arn
+            )
+
+            if (
+                current_state.lower() in self.VALID_STATES
+                and provision_status in self.TERMINAL_PROVISION_STATES
+            ):
+                self.log.info("DMS replication config(%s) is in valid state.", 
self.replication_config_arn)
+
+                self.hook.delete_replication_config(
+                    replication_config_arn=self.replication_config_arn,
+                    delay=self.waiter_delay,
+                    max_attempts=self.waiter_max_attempts,
+                )
+                self.handle_delete_wait()
+
+            else:
+                # Must be in a terminal state to delete
+                self.log.info(
+                    "DMS replication config(%s) cannot be deleted until 
replication is in terminal state and deprovisioned. Waiting for terminal 
state.",
+                    self.replication_config_arn,
+                )
+                if self.deferrable:
+                    if current_state.lower() not in self.VALID_STATES:
+                        self.log.info("Deferring until terminal status 
reached.")
+                        self.defer(
+                            trigger=DmsReplicationTerminalStatusTrigger(
+                                
replication_config_arn=self.replication_config_arn,
+                                waiter_delay=self.waiter_delay,
+                                waiter_max_attempts=self.waiter_max_attempts,
+                                aws_conn_id=self.aws_conn_id,
+                            ),
+                            method_name="retry_execution",
+                        )
+                    if provision_status not in self.TERMINAL_PROVISION_STATES: 
 # not deprovisioned:
+                        self.log.info("Deferring until deprovisioning 
completes.")
+                        self.defer(
+                            trigger=DmsReplicationDeprovisionedTrigger(
+                                
replication_config_arn=self.replication_config_arn,
+                                waiter_delay=self.waiter_delay,
+                                waiter_max_attempts=self.waiter_max_attempts,
+                                aws_conn_id=self.aws_conn_id,
+                            ),
+                            method_name="retry_execution",
+                        )
+
+                else:
+                    self.hook.get_waiter("replication_terminal_status").wait(
+                        Filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}],
+                        WaiterConfig={"Delay": self.waiter_delay, 
"MaxAttempts": self.waiter_max_attempts},
+                    )
+                    self.hook.get_waiter("replication_deprovisioned").wait(
+                        Filters=[{"Name": "replication-config-arn", "Values": 
[self.replication_config_arn]}],
+                        WaiterConfig={"Delay": self.waiter_delay, 
"MaxAttempts": self.waiter_max_attempts},
+                    )
+                    
self.hook.delete_replication_config(self.replication_config_arn)
+                    self.handle_delete_wait()
+
+        else:
+            self.log.info("DMS replication config(%s) does not exist.", 
self.replication_config_arn)
+
+    def handle_delete_wait(self):
+        if self.deferrable:
+            self.log.info("Deferring until replication config is deleted.")
+            self.defer(
+                trigger=DmsReplicationConfigDeletedTrigger(
+                    replication_config_arn=self.replication_config_arn,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    aws_conn_id=self.aws_conn_id,
+                ),
+                method_name="execute_complete",
+            )

Review Comment:
   That should be put under `if self.wait_for_completion:`. If `deferrable` is 
`True` but `wait_for_completion` is `False`, you should not wait



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to