This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 96661789cc New AWS sensor — DynamoDBValueSensor (#28338)
96661789cc is described below

commit 96661789ccfd6798677cd7f15e987e24c1e9db1b
Author: Mark Richman <[email protected]>
AuthorDate: Fri Apr 14 03:34:45 2023 -0400

    New AWS sensor — DynamoDBValueSensor (#28338)
---
 airflow/providers/amazon/aws/sensors/dynamodb.py   |  95 +++++++++++++++++
 airflow/providers/amazon/provider.yaml             |   5 +
 .../operators/dynamodb.rst                         |  55 ++++++++++
 .../providers/amazon/aws/sensors/test_dynamodb.py  |  93 +++++++++++++++++
 .../providers/amazon/aws/example_dynamodb.py       | 113 +++++++++++++++++++++
 5 files changed, 361 insertions(+)

diff --git a/airflow/providers/amazon/aws/sensors/dynamodb.py 
b/airflow/providers/amazon/aws/sensors/dynamodb.py
new file mode 100644
index 0000000000..6e229bbbab
--- /dev/null
+++ b/airflow/providers/amazon/aws/sensors/dynamodb.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+from airflow.compat.functools import cached_property
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class DynamoDBValueSensor(BaseSensorOperator):
+    """
+    Waits for an attribute value to be present for an item in a DynamoDB table.
+
+    :param partition_key_name: DynamoDB partition key name
+    :param partition_key_value: DynamoDB partition key value
+    :param attribute_name: DynamoDB attribute name
+    :param attribute_value: DynamoDB attribute value
+    :param sort_key_name: (optional) DynamoDB sort key name
+    :param sort_key_value: (optional) DynamoDB sort key value
+    """
+
+    def __init__(
+        self,
+        table_name: str,
+        partition_key_name: str,
+        partition_key_value: str,
+        attribute_name: str,
+        attribute_value: str,
+        sort_key_name: str | None = None,
+        sort_key_value: str | None = None,
+        aws_conn_id: str | None = DynamoDBHook.default_conn_name,
+        region_name: str | None = None,
+        **kwargs: Any,
+    ):
+        super().__init__(**kwargs)
+        self.table_name = table_name
+        self.partition_key_name = partition_key_name
+        self.partition_key_value = partition_key_value
+        self.attribute_name = attribute_name
+        self.attribute_value = attribute_value
+        self.sort_key_name = sort_key_name
+        self.sort_key_value = sort_key_value
+        self.aws_conn_id = aws_conn_id
+        self.region_name = region_name
+
+    def poke(self, context: Context) -> bool:
+        """Test DynamoDB item for matching attribute value"""
+        key = {self.partition_key_name: self.partition_key_value}
+        msg = (
+            f"Checking table {self.table_name} for "
+            + f"item Partition Key: 
{self.partition_key_name}={self.partition_key_value}"
+        )
+
+        if self.sort_key_name and self.sort_key_value:
+            key = {self.partition_key_name: self.partition_key_value, 
self.sort_key_name: self.sort_key_value}
+            msg += f"\nSort Key: {self.sort_key_name}={self.sort_key_value}"
+
+        msg += f"\nattribute: {self.attribute_name}={self.attribute_value}"
+
+        self.log.info(msg)
+        table = self.hook.conn.Table(self.table_name)
+        self.log.info("Table: %s", table)
+        self.log.info("Key: %s", key)
+        response = table.get_item(Key=key)
+        try:
+            self.log.info("Response: %s", response)
+            self.log.info("Want: %s = %s", self.attribute_name, 
self.attribute_value)
+            self.log.info("Got: {response['Item'][self.attribute_name]} = %s", 
self.attribute_value)
+            return response["Item"][self.attribute_name] == 
self.attribute_value
+        except KeyError:
+            return False
+
+    @cached_property
+    def hook(self) -> DynamoDBHook:
+        """Create and return a DynamoDBHook"""
+        return DynamoDBHook(self.aws_conn_id, region_name=self.region_name)
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index 05ee544820..b46f17323d 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -97,6 +97,8 @@ integrations:
   - integration-name: Amazon DynamoDB
     external-doc-url: https://aws.amazon.com/dynamodb/
     logo: /integration-logos/aws/[email protected]
+    how-to-guide:
+      - /docs/apache-airflow-providers-amazon/operators/dynamodb.rst
     tags: [aws]
   - integration-name: Amazon EC2
     external-doc-url: https://aws.amazon.com/ec2/
@@ -347,6 +349,9 @@ sensors:
   - integration-name: AWS Database Migration Service
     python-modules:
       - airflow.providers.amazon.aws.sensors.dms
+  - integration-name: Amazon DynamoDB
+    python-modules:
+      - airflow.providers.amazon.aws.sensors.dynamodb
   - integration-name: Amazon EC2
     python-modules:
       - airflow.providers.amazon.aws.sensors.ec2
diff --git a/docs/apache-airflow-providers-amazon/operators/dynamodb.rst 
b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
new file mode 100644
index 0000000000..1a3693a455
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
@@ -0,0 +1,55 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+===============
+Amazon DynamoDB
+===============
+
+`Amazon DynamoDB <https://aws.amazon.com/dynamodb/>`__ Amazon DynamoDB is a
+fully managed, serverless, key-value NoSQL database designed to run
+high-performance applications at any scale. DynamoDB offers built-in security,
+continuous backups, automated multi-Region replication, in-memory caching, and
+data import and export tools.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+
+Sensors
+-------
+
+.. _howto/sensor:DynamoDBValueSensor:
+
+Wait on Amazon DynamoDB item attribute value match
+==================================================
+
+Use the 
:class:`~airflow.providers.amazon.aws.sensors.dynamodb.DynamoDBValueSensor`
+to wait for the presence of a matching DynamoDB item's attribute/value pair.
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_dynamodb.py
+    :language: python
+    :start-after: [START howto_sensor_dynamodb]
+    :dedent: 4
+    :end-before: [END howto_sensor_dynamodb]
+
+
+Reference
+---------
+
+* `AWS boto3 library documentation for DynamoDB 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html>`__
diff --git a/tests/providers/amazon/aws/sensors/test_dynamodb.py 
b/tests/providers/amazon/aws/sensors/test_dynamodb.py
new file mode 100644
index 0000000000..05668f5444
--- /dev/null
+++ b/tests/providers/amazon/aws/sensors/test_dynamodb.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from moto import mock_dynamodb
+
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor
+
+
+class TestDynamoDBValueSensor:
+    def setup_method(self):
+        self.table_name = "test_airflow"
+        self.pk_name = "PK"
+        self.pk_value = "PKTest"
+        self.sk_name = "SK"
+        self.sk_value = "SKTest"
+        self.attribute_name = "Foo"
+        self.attribute_value = "Bar"
+
+        self.sensor = DynamoDBValueSensor(
+            task_id="dynamodb_value_sensor",
+            table_name=self.table_name,
+            partition_key_name=self.pk_name,
+            partition_key_value=self.pk_value,
+            attribute_name=self.attribute_name,
+            attribute_value=self.attribute_value,
+            sort_key_name=self.sk_name,
+            sort_key_value=self.sk_value,
+        )
+
+    @mock_dynamodb
+    def test_sensor_with_pk(self):
+        hook = DynamoDBHook(table_name=self.table_name, 
table_keys=[self.pk_name])
+
+        hook.conn.create_table(
+            TableName=self.table_name,
+            KeySchema=[{"AttributeName": self.pk_name, "KeyType": "HASH"}],
+            AttributeDefinitions=[{"AttributeName": self.pk_name, 
"AttributeType": "S"}],
+            ProvisionedThroughput={"ReadCapacityUnits": 10, 
"WriteCapacityUnits": 10},
+        )
+
+        assert not self.sensor.poke(None)
+
+        items = [{self.pk_name: self.pk_value, self.attribute_name: 
self.attribute_value}]
+        hook.write_batch_data(items)
+
+        assert self.sensor.poke(None)
+
+    @mock_dynamodb
+    def test_sensor_with_pk_and_sk(self):
+        hook = DynamoDBHook(table_name=self.table_name, 
table_keys=[self.pk_name, self.sk_name])
+
+        hook.conn.create_table(
+            TableName=self.table_name,
+            KeySchema=[
+                {"AttributeName": self.pk_name, "KeyType": "HASH"},
+                {"AttributeName": self.sk_name, "KeyType": "RANGE"},
+            ],
+            AttributeDefinitions=[
+                {"AttributeName": self.pk_name, "AttributeType": "S"},
+                {"AttributeName": self.sk_name, "AttributeType": "S"},
+            ],
+            ProvisionedThroughput={"ReadCapacityUnits": 10, 
"WriteCapacityUnits": 10},
+        )
+
+        assert not self.sensor.poke(None)
+
+        items = [
+            {
+                self.pk_name: self.pk_value,
+                self.sk_name: self.sk_value,
+                self.attribute_name: self.attribute_value,
+            }
+        ]
+        hook.write_batch_data(items)
+
+        assert self.sensor.poke(None)
diff --git a/tests/system/providers/amazon/aws/example_dynamodb.py 
b/tests/system/providers/amazon/aws/example_dynamodb.py
new file mode 100644
index 0000000000..37bea8f2b9
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_dynamodb.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+import boto3
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
+
+DAG_ID = "example_dynamodbvaluesensor"
+sys_test_context_task = SystemTestContextBuilder().build()
+
+TABLE_ATTRIBUTES = [
+    {"AttributeName": "PK", "AttributeType": "S"},
+    {"AttributeName": "SK", "AttributeType": "S"},
+]
+TABLE_KEY_SCHEMA = [
+    {"AttributeName": "PK", "KeyType": "HASH"},
+    {"AttributeName": "SK", "KeyType": "RANGE"},
+]
+TABLE_THROUGHPUT = {"ReadCapacityUnits": 10, "WriteCapacityUnits": 10}
+
+
+@task
+def create_table(table_name: str):
+    ddb = boto3.resource("dynamodb")
+    table = ddb.create_table(
+        AttributeDefinitions=TABLE_ATTRIBUTES,
+        TableName=table_name,
+        KeySchema=TABLE_KEY_SCHEMA,
+        ProvisionedThroughput=TABLE_THROUGHPUT,
+    )
+    boto3.client("dynamodb").get_waiter("table_exists").wait()
+    table.put_item(Item={"PK": "Test", "SK": "2022-07-12T11:11:25-0400", 
"Value": "Testing"})
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_table(table_name: str):
+    ddb = boto3.resource("dynamodb")
+    ddb.delete_table(TableName=table_name)
+    boto3.client("dynamodb").get_waiter("table_not_exists").wait()
+
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example"],
+) as dag:
+    test_context = sys_test_context_task()
+    env_id = test_context[ENV_ID_KEY]
+    table_name = f"{env_id}-dynamodb-table"
+    create_table = create_table(table_name=table_name)
+
+    # [START howto_sensor_dynamodb]
+    dynamodb_sensor = DynamoDBValueSensor(
+        task_id="waiting_for_dynamodb_item_value",
+        poke_interval=30,
+        timeout=120,
+        soft_fail=False,
+        retries=10,
+        table_name="AirflowSensorTest",
+        partition_key_name="PK",
+        partition_key_value="Test",
+        sort_key_name="SK",
+        sort_key_value="2022-07-12T11:11:25-0400",
+        attribute_name="Value",
+        attribute_value="Testing",
+    )
+    # [END howto_sensor_dynamodb]
+
+    chain(
+        # TEST SETUP
+        test_context,
+        # TEST BODY
+        create_table,
+        dynamodb_sensor,
+        # TEST TEARDOWN
+        delete_table,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to