This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 96661789cc New AWS sensor — DynamoDBValueSensor (#28338)
96661789cc is described below
commit 96661789ccfd6798677cd7f15e987e24c1e9db1b
Author: Mark Richman <[email protected]>
AuthorDate: Fri Apr 14 03:34:45 2023 -0400
New AWS sensor — DynamoDBValueSensor (#28338)
---
airflow/providers/amazon/aws/sensors/dynamodb.py | 95 +++++++++++++++++
airflow/providers/amazon/provider.yaml | 5 +
.../operators/dynamodb.rst | 55 ++++++++++
.../providers/amazon/aws/sensors/test_dynamodb.py | 93 +++++++++++++++++
.../providers/amazon/aws/example_dynamodb.py | 113 +++++++++++++++++++++
5 files changed, 361 insertions(+)
diff --git a/airflow/providers/amazon/aws/sensors/dynamodb.py
b/airflow/providers/amazon/aws/sensors/dynamodb.py
new file mode 100644
index 0000000000..6e229bbbab
--- /dev/null
+++ b/airflow/providers/amazon/aws/sensors/dynamodb.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+from airflow.compat.functools import cached_property
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+
+class DynamoDBValueSensor(BaseSensorOperator):
+ """
+ Waits for an attribute value to be present for an item in a DynamoDB table.
+
+ :param partition_key_name: DynamoDB partition key name
+ :param partition_key_value: DynamoDB partition key value
+ :param attribute_name: DynamoDB attribute name
+ :param attribute_value: DynamoDB attribute value
+ :param sort_key_name: (optional) DynamoDB sort key name
+ :param sort_key_value: (optional) DynamoDB sort key value
+ """
+
+ def __init__(
+ self,
+ table_name: str,
+ partition_key_name: str,
+ partition_key_value: str,
+ attribute_name: str,
+ attribute_value: str,
+ sort_key_name: str | None = None,
+ sort_key_value: str | None = None,
+ aws_conn_id: str | None = DynamoDBHook.default_conn_name,
+ region_name: str | None = None,
+ **kwargs: Any,
+ ):
+ super().__init__(**kwargs)
+ self.table_name = table_name
+ self.partition_key_name = partition_key_name
+ self.partition_key_value = partition_key_value
+ self.attribute_name = attribute_name
+ self.attribute_value = attribute_value
+ self.sort_key_name = sort_key_name
+ self.sort_key_value = sort_key_value
+ self.aws_conn_id = aws_conn_id
+ self.region_name = region_name
+
+ def poke(self, context: Context) -> bool:
+ """Test DynamoDB item for matching attribute value"""
+ key = {self.partition_key_name: self.partition_key_value}
+ msg = (
+ f"Checking table {self.table_name} for "
+ + f"item Partition Key:
{self.partition_key_name}={self.partition_key_value}"
+ )
+
+ if self.sort_key_name and self.sort_key_value:
+ key = {self.partition_key_name: self.partition_key_value,
self.sort_key_name: self.sort_key_value}
+ msg += f"\nSort Key: {self.sort_key_name}={self.sort_key_value}"
+
+ msg += f"\nattribute: {self.attribute_name}={self.attribute_value}"
+
+ self.log.info(msg)
+ table = self.hook.conn.Table(self.table_name)
+ self.log.info("Table: %s", table)
+ self.log.info("Key: %s", key)
+ response = table.get_item(Key=key)
+ try:
+ self.log.info("Response: %s", response)
+ self.log.info("Want: %s = %s", self.attribute_name,
self.attribute_value)
+ self.log.info("Got: {response['Item'][self.attribute_name]} = %s",
self.attribute_value)
+ return response["Item"][self.attribute_name] ==
self.attribute_value
+ except KeyError:
+ return False
+
+ @cached_property
+ def hook(self) -> DynamoDBHook:
+ """Create and return a DynamoDBHook"""
+ return DynamoDBHook(self.aws_conn_id, region_name=self.region_name)
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index 05ee544820..b46f17323d 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -97,6 +97,8 @@ integrations:
- integration-name: Amazon DynamoDB
external-doc-url: https://aws.amazon.com/dynamodb/
logo: /integration-logos/aws/[email protected]
+ how-to-guide:
+ - /docs/apache-airflow-providers-amazon/operators/dynamodb.rst
tags: [aws]
- integration-name: Amazon EC2
external-doc-url: https://aws.amazon.com/ec2/
@@ -347,6 +349,9 @@ sensors:
- integration-name: AWS Database Migration Service
python-modules:
- airflow.providers.amazon.aws.sensors.dms
+ - integration-name: Amazon DynamoDB
+ python-modules:
+ - airflow.providers.amazon.aws.sensors.dynamodb
- integration-name: Amazon EC2
python-modules:
- airflow.providers.amazon.aws.sensors.ec2
diff --git a/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
new file mode 100644
index 0000000000..1a3693a455
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
@@ -0,0 +1,55 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+===============
+Amazon DynamoDB
+===============
+
+`Amazon DynamoDB <https://aws.amazon.com/dynamodb/>`__ Amazon DynamoDB is a
+fully managed, serverless, key-value NoSQL database designed to run
+high-performance applications at any scale. DynamoDB offers built-in security,
+continuous backups, automated multi-Region replication, in-memory caching, and
+data import and export tools.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+
+Sensors
+-------
+
+.. _howto/sensor:DynamoDBValueSensor:
+
+Wait on Amazon DynamoDB item attribute value match
+==================================================
+
+Use the
:class:`~airflow.providers.amazon.aws.sensors.dynamodb.DynamoDBValueSensor`
+to wait for the presence of a matching DynamoDB item's attribute/value pair.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_dynamodb.py
+ :language: python
+ :start-after: [START howto_sensor_dynamodb]
+ :dedent: 4
+ :end-before: [END howto_sensor_dynamodb]
+
+
+Reference
+---------
+
+* `AWS boto3 library documentation for DynamoDB
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html>`__
diff --git a/tests/providers/amazon/aws/sensors/test_dynamodb.py
b/tests/providers/amazon/aws/sensors/test_dynamodb.py
new file mode 100644
index 0000000000..05668f5444
--- /dev/null
+++ b/tests/providers/amazon/aws/sensors/test_dynamodb.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from moto import mock_dynamodb
+
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor
+
+
+class TestDynamoDBValueSensor:
+ def setup_method(self):
+ self.table_name = "test_airflow"
+ self.pk_name = "PK"
+ self.pk_value = "PKTest"
+ self.sk_name = "SK"
+ self.sk_value = "SKTest"
+ self.attribute_name = "Foo"
+ self.attribute_value = "Bar"
+
+ self.sensor = DynamoDBValueSensor(
+ task_id="dynamodb_value_sensor",
+ table_name=self.table_name,
+ partition_key_name=self.pk_name,
+ partition_key_value=self.pk_value,
+ attribute_name=self.attribute_name,
+ attribute_value=self.attribute_value,
+ sort_key_name=self.sk_name,
+ sort_key_value=self.sk_value,
+ )
+
+ @mock_dynamodb
+ def test_sensor_with_pk(self):
+ hook = DynamoDBHook(table_name=self.table_name,
table_keys=[self.pk_name])
+
+ hook.conn.create_table(
+ TableName=self.table_name,
+ KeySchema=[{"AttributeName": self.pk_name, "KeyType": "HASH"}],
+ AttributeDefinitions=[{"AttributeName": self.pk_name,
"AttributeType": "S"}],
+ ProvisionedThroughput={"ReadCapacityUnits": 10,
"WriteCapacityUnits": 10},
+ )
+
+ assert not self.sensor.poke(None)
+
+ items = [{self.pk_name: self.pk_value, self.attribute_name:
self.attribute_value}]
+ hook.write_batch_data(items)
+
+ assert self.sensor.poke(None)
+
+ @mock_dynamodb
+ def test_sensor_with_pk_and_sk(self):
+ hook = DynamoDBHook(table_name=self.table_name,
table_keys=[self.pk_name, self.sk_name])
+
+ hook.conn.create_table(
+ TableName=self.table_name,
+ KeySchema=[
+ {"AttributeName": self.pk_name, "KeyType": "HASH"},
+ {"AttributeName": self.sk_name, "KeyType": "RANGE"},
+ ],
+ AttributeDefinitions=[
+ {"AttributeName": self.pk_name, "AttributeType": "S"},
+ {"AttributeName": self.sk_name, "AttributeType": "S"},
+ ],
+ ProvisionedThroughput={"ReadCapacityUnits": 10,
"WriteCapacityUnits": 10},
+ )
+
+ assert not self.sensor.poke(None)
+
+ items = [
+ {
+ self.pk_name: self.pk_value,
+ self.sk_name: self.sk_value,
+ self.attribute_name: self.attribute_value,
+ }
+ ]
+ hook.write_batch_data(items)
+
+ assert self.sensor.poke(None)
diff --git a/tests/system/providers/amazon/aws/example_dynamodb.py
b/tests/system/providers/amazon/aws/example_dynamodb.py
new file mode 100644
index 0000000000..37bea8f2b9
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_dynamodb.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+import boto3
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
+
+DAG_ID = "example_dynamodbvaluesensor"
+sys_test_context_task = SystemTestContextBuilder().build()
+
+TABLE_ATTRIBUTES = [
+ {"AttributeName": "PK", "AttributeType": "S"},
+ {"AttributeName": "SK", "AttributeType": "S"},
+]
+TABLE_KEY_SCHEMA = [
+ {"AttributeName": "PK", "KeyType": "HASH"},
+ {"AttributeName": "SK", "KeyType": "RANGE"},
+]
+TABLE_THROUGHPUT = {"ReadCapacityUnits": 10, "WriteCapacityUnits": 10}
+
+
+@task
+def create_table(table_name: str):
+ ddb = boto3.resource("dynamodb")
+ table = ddb.create_table(
+ AttributeDefinitions=TABLE_ATTRIBUTES,
+ TableName=table_name,
+ KeySchema=TABLE_KEY_SCHEMA,
+ ProvisionedThroughput=TABLE_THROUGHPUT,
+ )
+ boto3.client("dynamodb").get_waiter("table_exists").wait()
+ table.put_item(Item={"PK": "Test", "SK": "2022-07-12T11:11:25-0400",
"Value": "Testing"})
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_table(table_name: str):
+ ddb = boto3.resource("dynamodb")
+ ddb.delete_table(TableName=table_name)
+ boto3.client("dynamodb").get_waiter("table_not_exists").wait()
+
+
+with DAG(
+ dag_id=DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example"],
+) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context[ENV_ID_KEY]
+ table_name = f"{env_id}-dynamodb-table"
+ create_table = create_table(table_name=table_name)
+
+ # [START howto_sensor_dynamodb]
+ dynamodb_sensor = DynamoDBValueSensor(
+ task_id="waiting_for_dynamodb_item_value",
+ poke_interval=30,
+ timeout=120,
+ soft_fail=False,
+ retries=10,
+ table_name="AirflowSensorTest",
+ partition_key_name="PK",
+ partition_key_value="Test",
+ sort_key_name="SK",
+ sort_key_value="2022-07-12T11:11:25-0400",
+ attribute_name="Value",
+ attribute_value="Testing",
+ )
+ # [END howto_sensor_dynamodb]
+
+ chain(
+ # TEST SETUP
+ test_context,
+ # TEST BODY
+ create_table,
+ dynamodb_sensor,
+ # TEST TEARDOWN
+ delete_table,
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)