This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fb3b980b5d Update the DMS Sample DAG and Docs (#23681)
fb3b980b5d is described below
commit fb3b980b5d313b6773f45fa191766eedfea4e36d
Author: D. Ferruzzi <[email protected]>
AuthorDate: Thu May 19 09:49:30 2022 -0700
Update the DMS Sample DAG and Docs (#23681)
---
.../amazon/aws/example_dags/example_dms.py | 347 +++++++++++++++++++++
.../aws/example_dags/example_dms_full_load_task.py | 92 ------
airflow/providers/amazon/aws/operators/dms.py | 10 +-
.../operators/dms.rst | 121 +++++--
4 files changed, 445 insertions(+), 125 deletions(-)
diff --git a/airflow/providers/amazon/aws/example_dags/example_dms.py
b/airflow/providers/amazon/aws/example_dags/example_dms.py
new file mode 100644
index 0000000000..e2ce8b852b
--- /dev/null
+++ b/airflow/providers/amazon/aws/example_dags/example_dms.py
@@ -0,0 +1,347 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Note: DMS requires you to configure specific IAM roles/permissions. For more
information, see
+https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Security.html#CHAP_Security.APIRole
+"""
+
+import json
+import os
+from datetime import datetime
+
+import boto3
+from sqlalchemy import Column, MetaData, String, Table, create_engine
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.operators.python import get_current_context
+from airflow.providers.amazon.aws.operators.dms import (
+ DmsCreateTaskOperator,
+ DmsDeleteTaskOperator,
+ DmsDescribeTasksOperator,
+ DmsStartTaskOperator,
+ DmsStopTaskOperator,
+)
+from airflow.providers.amazon.aws.sensors.dms import DmsTaskBaseSensor,
DmsTaskCompletedSensor
+
+S3_BUCKET = os.getenv('S3_BUCKET', 's3_bucket_name')
+ROLE_ARN = os.getenv('ROLE_ARN',
'arn:aws:iam::1234567890:role/s3_target_endpoint_role')
+
+# The project name will be used as a prefix for various entity names.
+# Use either PascalCase or camelCase. While some names require kebab-case
+# and others require snake_case, they all accept mixedCase strings.
+PROJECT_NAME = 'DmsDemo'
+
+# Config values for setting up the "Source" database.
+RDS_ENGINE = 'postgres'
+RDS_PROTOCOL = 'postgresql'
+RDS_USERNAME = 'username'
+# NEVER store your production password in plaintext in a DAG like this.
+# Use Airflow Secrets or a secret manager for this in production.
+RDS_PASSWORD = 'rds_password'
+
+# Config values for RDS.
+RDS_INSTANCE_NAME = f'{PROJECT_NAME}-instance'
+RDS_DB_NAME = f'{PROJECT_NAME}_source_database'
+
+# Config values for DMS.
+DMS_REPLICATION_INSTANCE_NAME = f'{PROJECT_NAME}-replication-instance'
+DMS_REPLICATION_TASK_ID = f'{PROJECT_NAME}-replication-task'
+SOURCE_ENDPOINT_IDENTIFIER = f'{PROJECT_NAME}-source-endpoint'
+TARGET_ENDPOINT_IDENTIFIER = f'{PROJECT_NAME}-target-endpoint'
+
+# Sample data.
+TABLE_NAME = f'{PROJECT_NAME}-table'
+TABLE_HEADERS = ['apache_project', 'release_year']
+SAMPLE_DATA = [
+ ('Airflow', '2015'),
+ ('OpenOffice', '2012'),
+ ('Subversion', '2000'),
+ ('NiFi', '2006'),
+]
+TABLE_DEFINITION = {
+ 'TableCount': '1',
+ 'Tables': [
+ {
+ 'TableName': TABLE_NAME,
+ 'TableColumns': [
+ {
+ 'ColumnName': TABLE_HEADERS[0],
+ 'ColumnType': 'STRING',
+ 'ColumnNullable': 'false',
+ 'ColumnIsPk': 'true',
+ },
+ {"ColumnName": TABLE_HEADERS[1], "ColumnType": 'STRING',
"ColumnLength": "4"},
+ ],
+ 'TableColumnsTotal': '2',
+ }
+ ],
+}
+TABLE_MAPPINGS = {
+ 'rules': [
+ {
+ 'rule-type': 'selection',
+ 'rule-id': '1',
+ 'rule-name': '1',
+ 'object-locator': {
+ 'schema-name': 'public',
+ 'table-name': TABLE_NAME,
+ },
+ 'rule-action': 'include',
+ }
+ ]
+}
+
+
+def _create_rds_instance():
+ print('Creating RDS Instance.')
+
+ rds_client = boto3.client('rds')
+ rds_client.create_db_instance(
+ DBName=RDS_DB_NAME,
+ DBInstanceIdentifier=RDS_INSTANCE_NAME,
+ AllocatedStorage=20,
+ DBInstanceClass='db.t3.micro',
+ Engine=RDS_ENGINE,
+ MasterUsername=RDS_USERNAME,
+ MasterUserPassword=RDS_PASSWORD,
+ )
+
+
rds_client.get_waiter('db_instance_available').wait(DBInstanceIdentifier=RDS_INSTANCE_NAME)
+
+ response =
rds_client.describe_db_instances(DBInstanceIdentifier=RDS_INSTANCE_NAME)
+ return response['DBInstances'][0]['Endpoint']
+
+
+def _create_rds_table(rds_endpoint):
+ print('Creating table.')
+
+ hostname = rds_endpoint['Address']
+ port = rds_endpoint['Port']
+ rds_url =
f'{RDS_PROTOCOL}://{RDS_USERNAME}:{RDS_PASSWORD}@{hostname}:{port}/{RDS_DB_NAME}'
+ engine = create_engine(rds_url)
+
+ table = Table(
+ TABLE_NAME,
+ MetaData(engine),
+ Column(TABLE_HEADERS[0], String, primary_key=True),
+ Column(TABLE_HEADERS[1], String),
+ )
+
+ with engine.connect() as connection:
+ # Create the Table.
+ table.create()
+ load_data = table.insert().values(SAMPLE_DATA)
+ connection.execute(load_data)
+
+ # Read the data back to verify everything is working.
+ connection.execute(table.select())
+
+
+def _create_dms_replication_instance(ti, dms_client):
+ print('Creating replication instance.')
+ instance_arn = dms_client.create_replication_instance(
+ ReplicationInstanceIdentifier=DMS_REPLICATION_INSTANCE_NAME,
+ ReplicationInstanceClass='dms.t3.micro',
+ )['ReplicationInstance']['ReplicationInstanceArn']
+
+ ti.xcom_push(key='replication_instance_arn', value=instance_arn)
+ return instance_arn
+
+
+def _create_dms_endpoints(ti, dms_client, rds_instance_endpoint):
+ print('Creating DMS source endpoint.')
+ source_endpoint_arn = dms_client.create_endpoint(
+ EndpointIdentifier=SOURCE_ENDPOINT_IDENTIFIER,
+ EndpointType='source',
+ EngineName=RDS_ENGINE,
+ Username=RDS_USERNAME,
+ Password=RDS_PASSWORD,
+ ServerName=rds_instance_endpoint['Address'],
+ Port=rds_instance_endpoint['Port'],
+ DatabaseName=RDS_DB_NAME,
+ )['Endpoint']['EndpointArn']
+
+ print('Creating DMS target endpoint.')
+ target_endpoint_arn = dms_client.create_endpoint(
+ EndpointIdentifier=TARGET_ENDPOINT_IDENTIFIER,
+ EndpointType='target',
+ EngineName='s3',
+ S3Settings={
+ 'BucketName': S3_BUCKET,
+ 'BucketFolder': PROJECT_NAME,
+ 'ServiceAccessRoleArn': ROLE_ARN,
+ 'ExternalTableDefinition': json.dumps(TABLE_DEFINITION),
+ },
+ )['Endpoint']['EndpointArn']
+
+ ti.xcom_push(key='source_endpoint_arn', value=source_endpoint_arn)
+ ti.xcom_push(key='target_endpoint_arn', value=target_endpoint_arn)
+
+
+def _await_setup_assets(dms_client, instance_arn):
+ print("Awaiting asset provisioning.")
+ dms_client.get_waiter('replication_instance_available').wait(
+ Filters=[{'Name': 'replication-instance-arn', 'Values':
[instance_arn]}]
+ )
+
+
+def _delete_rds_instance():
+ print('Deleting RDS Instance.')
+
+ rds_client = boto3.client('rds')
+ rds_client.delete_db_instance(
+ DBInstanceIdentifier=RDS_INSTANCE_NAME,
+ SkipFinalSnapshot=True,
+ )
+
+
rds_client.get_waiter('db_instance_deleted').wait(DBInstanceIdentifier=RDS_INSTANCE_NAME)
+
+
+def _delete_dms_assets(dms_client):
+ ti = get_current_context()['ti']
+ replication_instance_arn = ti.xcom_pull(key='replication_instance_arn')
+ source_arn = ti.xcom_pull(key='source_endpoint_arn')
+ target_arn = ti.xcom_pull(key='target_endpoint_arn')
+
+ print('Deleting DMS assets.')
+
dms_client.delete_replication_instance(ReplicationInstanceArn=replication_instance_arn)
+ dms_client.delete_endpoint(EndpointArn=source_arn)
+ dms_client.delete_endpoint(EndpointArn=target_arn)
+
+
+def _await_all_teardowns(dms_client):
+ print('Awaiting tear-down.')
+ dms_client.get_waiter('replication_instance_deleted').wait(
+ Filters=[{'Name': 'replication-instance-id', 'Values':
[DMS_REPLICATION_INSTANCE_NAME]}]
+ )
+
+ dms_client.get_waiter('endpoint_deleted').wait(
+ Filters=[
+ {
+ 'Name': 'endpoint-id',
+ 'Values': [SOURCE_ENDPOINT_IDENTIFIER,
TARGET_ENDPOINT_IDENTIFIER],
+ }
+ ]
+ )
+
+
+@task
+def set_up():
+ ti = get_current_context()['ti']
+ dms_client = boto3.client('dms')
+
+ rds_instance_endpoint = _create_rds_instance()
+ _create_rds_table(rds_instance_endpoint)
+ instance_arn = _create_dms_replication_instance(ti, dms_client)
+ _create_dms_endpoints(ti, dms_client, rds_instance_endpoint)
+ _await_setup_assets(dms_client, instance_arn)
+
+
+@task(trigger_rule='all_done')
+def clean_up():
+ dms_client = boto3.client('dms')
+
+ _delete_rds_instance()
+ _delete_dms_assets(dms_client)
+ _await_all_teardowns(dms_client)
+
+
+with DAG(
+ dag_id='example_dms',
+ schedule_interval=None,
+ start_date=datetime(2021, 1, 1),
+ tags=['example'],
+ catchup=False,
+) as dag:
+
+ # [START howto_operator_dms_create_task]
+ create_task = DmsCreateTaskOperator(
+ task_id='create_task',
+ replication_task_id=DMS_REPLICATION_TASK_ID,
+ source_endpoint_arn='{{ ti.xcom_pull(key="source_endpoint_arn") }}',
+ target_endpoint_arn='{{ ti.xcom_pull(key="target_endpoint_arn") }}',
+ replication_instance_arn='{{
ti.xcom_pull(key="replication_instance_arn") }}',
+ table_mappings=TABLE_MAPPINGS,
+ )
+ # [END howto_operator_dms_create_task]
+
+ # [START howto_operator_dms_start_task]
+ start_task = DmsStartTaskOperator(
+ task_id='start_task',
+ replication_task_arn=create_task.output,
+ )
+ # [END howto_operator_dms_start_task]
+
+ # [START howto_operator_dms_describe_tasks]
+ describe_tasks = DmsDescribeTasksOperator(
+ task_id='describe_tasks',
+ describe_tasks_kwargs={
+ 'Filters': [
+ {
+ 'Name': 'replication-instance-arn',
+ 'Values': ['{{
ti.xcom_pull(key="replication_instance_arn") }}'],
+ }
+ ]
+ },
+ do_xcom_push=False,
+ )
+ # [END howto_operator_dms_describe_tasks]
+
+ await_task_start = DmsTaskBaseSensor(
+ task_id='await_task_start',
+ replication_task_arn=create_task.output,
+ target_statuses=['running'],
+ termination_statuses=['stopped', 'deleting', 'failed'],
+ )
+
+ # [START howto_operator_dms_stop_task]
+ stop_task = DmsStopTaskOperator(
+ task_id='stop_task',
+ replication_task_arn=create_task.output,
+ )
+ # [END howto_operator_dms_stop_task]
+
+ # TaskCompletedSensor actually waits until task reaches the "Stopped"
state, so it will work here.
+ # [START howto_operator_dms_task_completed_sensor]
+ await_task_stop = DmsTaskCompletedSensor(
+ task_id='await_task_stop',
+ replication_task_arn=create_task.output,
+ )
+ # [END howto_operator_dms_task_completed_sensor]
+
+ # [START howto_operator_dms_delete_task]
+ delete_task = DmsDeleteTaskOperator(
+ task_id='delete_task',
+ replication_task_arn=create_task.output,
+ trigger_rule='all_done',
+ )
+ # [END howto_operator_dms_delete_task]
+
+ chain(
+ set_up()
+ >> create_task
+ >> start_task
+ >> describe_tasks
+ >> await_task_start
+ >> stop_task
+ >> await_task_stop
+ >> delete_task
+ >> clean_up()
+ )
diff --git
a/airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py
b/airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py
deleted file mode 100644
index 939ca36dfd..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-This is an example dag for running full load DMS replication task.
-"""
-from datetime import datetime, timedelta
-from os import getenv
-
-from airflow import DAG
-from airflow.providers.amazon.aws.operators.dms import (
- DmsCreateTaskOperator,
- DmsDeleteTaskOperator,
- DmsStartTaskOperator,
-)
-from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor
-
-REPLICATION_TASK_ID = getenv('REPLICATION_TASK_ID', 'full-load-test-export')
-SOURCE_ENDPOINT_ARN = getenv('SOURCE_ENDPOINT_ARN', 'source_endpoint_arn')
-TARGET_ENDPOINT_ARN = getenv('TARGET_ENDPOINT_ARN', 'target_endpoint_arn')
-REPLICATION_INSTANCE_ARN = getenv('REPLICATION_INSTANCE_ARN',
'replication_instance_arn')
-TABLE_MAPPINGS = {
- 'rules': [
- {
- 'rule-type': 'selection',
- 'rule-id': '1',
- 'rule-name': '1',
- 'object-locator': {
- 'schema-name': 'test',
- 'table-name': '%',
- },
- 'rule-action': 'include',
- }
- ]
-}
-
-
-with DAG(
- dag_id='dms_full_load_task_run_dag',
- dagrun_timeout=timedelta(hours=2),
- start_date=datetime(2021, 1, 1),
- schedule_interval='0 3 * * *',
- catchup=False,
- tags=['example'],
-) as dag:
-
- # [START howto_dms_operators]
- create_task = DmsCreateTaskOperator(
- task_id='create_task',
- replication_task_id=REPLICATION_TASK_ID,
- source_endpoint_arn=SOURCE_ENDPOINT_ARN,
- target_endpoint_arn=TARGET_ENDPOINT_ARN,
- replication_instance_arn=REPLICATION_INSTANCE_ARN,
- table_mappings=TABLE_MAPPINGS,
- )
-
- start_task = DmsStartTaskOperator(
- task_id='start_task',
- replication_task_arn=create_task.output,
- )
-
- wait_for_completion = DmsTaskCompletedSensor(
- task_id='wait_for_completion',
- replication_task_arn=create_task.output,
- )
-
- delete_task = DmsDeleteTaskOperator(
- task_id='delete_task',
- replication_task_arn=create_task.output,
- )
- # [END howto_dms_operators]
-
- start_task >> wait_for_completion >> delete_task
-
- # Task dependencies created via `XComArgs`:
- # create_task >> start_task
- # create_task >> wait_for_completion
- # create_task >> delete_task
diff --git a/airflow/providers/amazon/aws/operators/dms.py
b/airflow/providers/amazon/aws/operators/dms.py
index aad1615552..aca515cfed 100644
--- a/airflow/providers/amazon/aws/operators/dms.py
+++ b/airflow/providers/amazon/aws/operators/dms.py
@@ -15,8 +15,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-
from typing import TYPE_CHECKING, Dict, Optional, Sequence
from airflow.models import BaseOperator
@@ -154,6 +152,10 @@ class DmsDescribeTasksOperator(BaseOperator):
"""
Describes AWS DMS replication tasks.
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DmsDescribeTasksOperator`
+
:param describe_tasks_kwargs: Describe tasks command arguments
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is None or empty then the default boto3 behaviour is used. If
@@ -250,6 +252,10 @@ class DmsStopTaskOperator(BaseOperator):
"""
Stops AWS DMS replication task.
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DmsStopTaskOperator`
+
:param replication_task_arn: Replication task ARN
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is None or empty then the default boto3 behaviour is used. If
diff --git a/docs/apache-airflow-providers-amazon/operators/dms.rst
b/docs/apache-airflow-providers-amazon/operators/dms.rst
index 5e18442671..8ca092b6fc 100644
--- a/docs/apache-airflow-providers-amazon/operators/dms.rst
+++ b/docs/apache-airflow-providers-amazon/operators/dms.rst
@@ -16,56 +16,115 @@
under the License.
-AWS Database Migration Service Operators
-========================================
+==============================================
+AWS Database Migration Service (DMS) Operators
+==============================================
+
+`AWS Database Migration Service (AWS DMS) <https://docs.aws.amazon.com/dms/>`__
+is a web service you can use to migrate data from your database that is
+on-premises, on an Amazon Relational Database Service (Amazon RDS) DB instance,
+or in a database on an Amazon Elastic Compute Cloud (Amazon EC2) instance to a
+database on an AWS service. These services can include a database on Amazon
RDS
+or a database on an Amazon EC2 instance. You can also migrate a database from
an
+AWS service to an on-premises database. You can migrate between source and
target
+endpoints that use the same database engine, such as from an Oracle database
to an
+Oracle database. You can also migrate between source and target endpoints that
use
+different database engines, such as from an Oracle database to a PostgreSQL
database.
Prerequisite Tasks
------------------
.. include:: _partials/prerequisite_tasks.rst
-Overview
---------
+.. _howto/operator:DmsCreateTaskOperator:
-Airflow to AWS Database Migration Service (DMS) integration provides several
operators to create and interact with
-DMS replication tasks.
+Operators
+---------
- - :class:`~airflow.providers.amazon.aws.sensors.dms.DmsTaskBaseSensor`
- - :class:`~airflow.providers.amazon.aws.sensors.dms.DmsTaskCompletedSensor`
- - :class:`~airflow.providers.amazon.aws.operators.dms.DmsCreateTaskOperator`
- - :class:`~airflow.providers.amazon.aws.operators.dms.DmsDeleteTaskOperator`
- -
:class:`~airflow.providers.amazon.aws.operators.dms.DmsDescribeTasksOperator`
- - :class:`~airflow.providers.amazon.aws.operators.dms.DmsStartTaskOperator`
- - :class:`~airflow.providers.amazon.aws.operators.dms.DmsStopTaskOperator`
+Create a replication task
+=========================
-One example_dag is provided which showcases some of these operators in action.
+To create a replication task you can use
+:class:`~airflow.providers.amazon.aws.operators.dms.DmsCreateTaskOperator`.
- - example_dms_full_load_task.py
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_dms.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_dms_create_task]
+ :end-before: [END howto_operator_dms_create_task]
-.. _howto/operator:DmsCreateTaskOperator:
-.. _howto/operator:DmsDeleteTaskOperator:
.. _howto/operator:DmsStartTaskOperator:
-.. _howto/sensor:DmsTaskCompletedSensor:
-Create replication task, wait for it completion and delete it.
---------------------------------------------------------------
+Start a replication task
+========================
+
+To start a replication task you can use
+:class:`~airflow.providers.amazon.aws.operators.dms.DmsStartTaskOperator`.
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_dms.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_dms_start_task]
+ :end-before: [END howto_operator_dms_start_task]
+
+.. _howto/operator:DmsDescribeTasksOperator:
+
+Get details of replication tasks
+================================
+
+To retrieve the details for a list of replication tasks you can use
+:class:`~airflow.providers.amazon.aws.operators.dms.DmsDescribeTasksOperator`.
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_dms.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_dms_describe_tasks]
+ :end-before: [END howto_operator_dms_describe_tasks]
+
+.. _howto/operator:DmsStopTaskOperator:
+
+Stop a replication task
+=======================
+
+To stop a replication task you can use
+:class:`~airflow.providers.amazon.aws.operators.dms.DmsStopTaskOperator`.
-Purpose
-"""""""
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_dms.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_dms_stop_task]
+ :end-before: [END howto_operator_dms_stop_task]
+
+.. _howto/operator:DmsDeleteTaskOperator:
-This example DAG ``example_dms_full_load_task.py`` uses
``DmsCreateTaskOperator``, ``DmsStartTaskOperator``,
-``DmsTaskCompletedSensor``, ``DmsDeleteTaskOperator`` to create replication
task, start it, wait for it
-to be completed, and then delete it.
+Delete a replication task
+=========================
+
+To delete a replication task you can use
+:class:`~airflow.providers.amazon.aws.operators.dms.DmsDeleteTaskOperator`.
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_dms.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_dms_delete_task]
+ :end-before: [END howto_operator_dms_delete_task]
+
+Sensors
+-------
+
+.. _howto/sensor:DmsTaskCompletedSensor:
-Defining tasks
-""""""""""""""
+Wait for a replication task to complete
+=======================================
-In the following code we create a new replication task, start it, wait for it
to be completed and then delete it.
+To check the state of a replication task until it is completed, you can use
+:class:`~airflow.providers.amazon.aws.sensors.dms.DmsTaskCompletedSensor`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_dms.py
:language: python
- :start-after: [START howto_dms_operators]
- :end-before: [END howto_dms_operators]
+ :dedent: 4
+ :start-after: [START howto_operator_dms_task_completed_sensor]
+ :end-before: [END howto_operator_dms_task_completed_sensor]
Reference