ellisms commented on code in PR #43988:
URL: https://github.com/apache/airflow/pull/43988#discussion_r1842675565
##########
providers/src/airflow/providers/amazon/aws/operators/dms.py:
##########
@@ -277,3 +288,551 @@ def execute(self, context: Context):
"""Stop AWS DMS replication task from Airflow."""
self.hook.stop_replication_task(replication_task_arn=self.replication_task_arn)
self.log.info("DMS replication task(%s) is stopping.",
self.replication_task_arn)
+
+
+class DmsDescribeReplicationConfigsOperator(AwsBaseOperator[DmsHook]):
+ """
+ Describes AWS DMS Serverless replication configurations.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DmsDescribeReplicationConfigsOperator`
+
+ :param describe_config_filter: Filters block for filtering results.
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ """
+
+ aws_hook_class = DmsHook
+ template_fields: Sequence[str] = aws_template_fields("filter")
+ template_fields_renderers = {"filter": "json"}
+
+ def __init__(
+ self,
+ *,
+ filter: list[dict] | None = None,
+ aws_conn_id: str | None = "aws_default",
+ **kwargs,
+ ):
+ super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+ self.filter = filter
+
+ def execute(self, context: Context) -> list:
+ """
+ Describe AWS DMS replication configurations.
+
+ :return: List of replication configurations
+ """
+ return self.hook.describe_replication_configs(filters=self.filter)
+
+
+class DmsCreateReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+ """
+
+ Creates an AWS DMS Serverless replication configuration.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DmsCreateReplicationConfigOperator`
+
+ :param replication_config_id: Unique identifier used to create a
ReplicationConfigArn.
+ :param source_endpoint_arn: ARN of the source endpoint
+ :param target_endpoint_arn: ARN of the target endpoint
+ :param compute_config: Parameters for provisioning an DMS Serverless
replication.
+ :param replication_type: type of DMS Serverless replication
+ :param table_mappings: JSON table mappings
+ :param tags: Key-value tag pairs
+ :param additional_config_kwargs: Additional configuration parameters for
DMS Serverless replication. Passed directly to the API
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ """
+
+ aws_hook_class = DmsHook
+ template_fields: Sequence[str] = aws_template_fields(
+ "replication_config_id",
+ "source_endpoint_arn",
+ "target_endpoint_arn",
+ "compute_config",
+ "replication_type",
+ "table_mappings",
+ )
+
+ template_fields_renderers = {"compute_config": "json", "tableMappings":
"json"}
+
+ def __init__(
+ self,
+ *,
+ replication_config_id: str,
+ source_endpoint_arn: str,
+ target_endpoint_arn: str,
+ compute_config: dict[str, Any],
+ replication_type: str,
+ table_mappings: str,
+ additional_config_kwargs: dict | None = None,
+ aws_conn_id: str | None = "aws_default",
+ **kwargs,
+ ):
+ super().__init__(
+ aws_conn_id=aws_conn_id,
+ **kwargs,
+ )
+
+ self.replication_config_id = replication_config_id
+ self.source_endpoint_arn = source_endpoint_arn
+ self.target_endpoint_arn = target_endpoint_arn
+ self.compute_config = compute_config
+ self.replication_type = replication_type
+ self.table_mappings = table_mappings
+ self.additional_config_kwargs = additional_config_kwargs or {}
+
+ def execute(self, context: Context) -> str:
+ resp = self.hook.create_replication_config(
+ replication_config_id=self.replication_config_id,
+ source_endpoint_arn=self.source_endpoint_arn,
+ target_endpoint_arn=self.target_endpoint_arn,
+ compute_config=self.compute_config,
+ replication_type=self.replication_type,
+ table_mappings=self.table_mappings,
+ additional_config_kwargs=self.additional_config_kwargs,
+ )
+
+ self.log.info("DMS replication config(%s) has been created.",
self.replication_config_id)
+ return resp
+
+
+class DmsDeleteReplicationConfigOperator(AwsBaseOperator[DmsHook]):
+ """
+
+ Deletes an AWS DMS Serverless replication configuration.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DmsDeleteReplicationConfigOperator`
+
+ :param replication_config_arn: ARN of the replication config
+ :param wait_for_completion: If True, waits for the replication config to
be deleted before returning.
+ If False, the operator will return immediately after the request is
made.
+ :param deferrable: Run the operator in deferrable mode.
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ """
+
+ aws_hook_class = DmsHook
+ template_fields: Sequence[str] =
aws_template_fields("replication_config_arn")
+
+ VALID_STATES = ["failed", "stopped", "created"]
+ DELETING_STATES = ["deleting"]
+ TERMINAL_PROVISION_STATES = ["deprovisioned", ""]
+
+ def __init__(
+ self,
+ *,
+ replication_config_arn: str,
+ wait_for_completion: bool = True,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ waiter_delay: int = 5,
+ waiter_max_attempts: int = 60,
+ aws_conn_id: str | None = "aws_default",
+ **kwargs,
+ ):
+ super().__init__(
+ aws_conn_id=aws_conn_id,
+ **kwargs,
+ )
+
+ self.replication_config_arn = replication_config_arn
+ self.wait_for_completion = wait_for_completion
+ self.deferrable = deferrable
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
+
+ def execute(self, context: Context) -> None:
+ results = self.hook.describe_replications(
+ filters=[{"Name": "replication-config-arn", "Values":
[self.replication_config_arn]}]
+ )
+
+ if len(results) > 0:
+ current_state = results[0].get("Status", "")
+ self.log.info(
+ "Current state of replication config(%s) is %s.",
self.replication_config_arn, current_state
+ )
+ # replication must be deprovisioned before deleting
+ provision_status = self.hook.get_provision_status(
+ replication_config_arn=self.replication_config_arn
+ )
+
+ if (
+ current_state.lower() in self.VALID_STATES
+ and provision_status in self.TERMINAL_PROVISION_STATES
+ ):
+ self.log.info("DMS replication config(%s) is in valid state.",
self.replication_config_arn)
+
+ self.hook.delete_replication_config(
+ replication_config_arn=self.replication_config_arn,
+ delay=self.waiter_delay,
+ max_attempts=self.waiter_max_attempts,
+ )
+ self.handle_delete_wait()
Review Comment:
Which block of code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]