This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9059f72668 Enhance `attribute_value` in `DynamoDBValueSensor` to 
accept list (#35831)
9059f72668 is described below

commit 9059f72668fb85253b8b4e3e9fb5350d621b639d
Author: Felipe Lolas <[email protected]>
AuthorDate: Sat Nov 25 05:03:23 2023 -0300

    Enhance `attribute_value` in `DynamoDBValueSensor` to accept list (#35831)
---
 airflow/providers/amazon/aws/sensors/dynamodb.py   | 11 ++--
 .../operators/dynamodb.rst                         | 15 +++++
 .../providers/amazon/aws/sensors/test_dynamodb.py  | 73 ++++++++++++++++++++++
 .../providers/amazon/aws/example_dynamodb.py       | 14 +++++
 4 files changed, 108 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/amazon/aws/sensors/dynamodb.py 
b/airflow/providers/amazon/aws/sensors/dynamodb.py
index bd820572ac..87a4faf1ab 100644
--- a/airflow/providers/amazon/aws/sensors/dynamodb.py
+++ b/airflow/providers/amazon/aws/sensors/dynamodb.py
@@ -17,7 +17,7 @@
 from __future__ import annotations
 
 from functools import cached_property
-from typing import TYPE_CHECKING, Any, Sequence
+from typing import TYPE_CHECKING, Any, Iterable, Sequence
 
 from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
 from airflow.sensors.base import BaseSensorOperator
@@ -61,7 +61,7 @@ class DynamoDBValueSensor(BaseSensorOperator):
         partition_key_name: str,
         partition_key_value: str,
         attribute_name: str,
-        attribute_value: str,
+        attribute_value: str | Iterable[str],
         sort_key_name: str | None = None,
         sort_key_value: str | None = None,
         aws_conn_id: str | None = DynamoDBHook.default_conn_name,
@@ -99,12 +99,13 @@ class DynamoDBValueSensor(BaseSensorOperator):
         self.log.info("Key: %s", key)
         response = table.get_item(Key=key)
         try:
+            item_attribute_value = response["Item"][self.attribute_name]
             self.log.info("Response: %s", response)
             self.log.info("Want: %s = %s", self.attribute_name, 
self.attribute_value)
-            self.log.info(
-                "Got: {response['Item'][self.attribute_name]} = %s", 
response["Item"][self.attribute_name]
+            self.log.info("Got: {response['Item'][self.attribute_name]} = %s", 
item_attribute_value)
+            return item_attribute_value in (
+                [self.attribute_value] if isinstance(self.attribute_value, 
str) else self.attribute_value
             )
-            return response["Item"][self.attribute_name] == 
self.attribute_value
         except KeyError:
             return False
 
diff --git a/docs/apache-airflow-providers-amazon/operators/dynamodb.rst 
b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
index f6df741706..42d41bc864 100644
--- a/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
+++ b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
@@ -42,12 +42,27 @@ Wait on Amazon DynamoDB item attribute value match
 Use the 
:class:`~airflow.providers.amazon.aws.sensors.dynamodb.DynamoDBValueSensor`
 to wait for the presence of a matching DynamoDB item's attribute/value pair.
 
+Wait for a Single Attribute Value Match:
+----------------------------------------
+
+This example shows how to use ``DynamoDBValueSensor`` to wait for a specific 
attribute/value pair in a DynamoDB item.
+
 .. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_dynamodb.py
     :language: python
     :start-after: [START howto_sensor_dynamodb_value]
     :dedent: 4
     :end-before: [END howto_sensor_dynamodb_value]
 
+Wait for Any Value from a List of Attribute Values:
+---------------------------------------------------
+
+In this example, the sensor waits for the DynamoDB item to have an attribute 
that matches any value from a provided list.
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_dynamodb.py
+    :language: python
+    :start-after: [START howto_sensor_dynamodb_any_value]
+    :dedent: 4
+    :end-before: [END howto_sensor_dynamodb_any_value]
 
 Reference
 ---------
diff --git a/tests/providers/amazon/aws/sensors/test_dynamodb.py 
b/tests/providers/amazon/aws/sensors/test_dynamodb.py
index 05668f5444..89d2b29b20 100644
--- a/tests/providers/amazon/aws/sensors/test_dynamodb.py
+++ b/tests/providers/amazon/aws/sensors/test_dynamodb.py
@@ -17,11 +17,14 @@
 
 from __future__ import annotations
 
+import pytest
 from moto import mock_dynamodb
 
 from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
 from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor
 
+pytestmark = pytest.mark.db_test
+
 
 class TestDynamoDBValueSensor:
     def setup_method(self):
@@ -91,3 +94,73 @@ class TestDynamoDBValueSensor:
         hook.write_batch_data(items)
 
         assert self.sensor.poke(None)
+
+
+class TestDynamoDBMultipleValuesSensor:
+    def setup_method(self):
+        self.table_name = "test_airflow"
+        self.pk_name = "PK"
+        self.pk_value = "PKTest"
+        self.sk_name = "SK"
+        self.sk_value = "SKTest"
+        self.attribute_name = "Foo"
+        self.attribute_value = ["Bar1", "Bar2", "Bar3"]
+
+        self.sensor = DynamoDBValueSensor(
+            task_id="dynamodb_value_sensor",
+            table_name=self.table_name,
+            partition_key_name=self.pk_name,
+            partition_key_value=self.pk_value,
+            attribute_name=self.attribute_name,
+            attribute_value=self.attribute_value,
+            sort_key_name=self.sk_name,
+            sort_key_value=self.sk_value,
+        )
+
+    @mock_dynamodb
+    def test_sensor_with_pk(self):
+        hook = DynamoDBHook(table_name=self.table_name, 
table_keys=[self.pk_name])
+
+        hook.conn.create_table(
+            TableName=self.table_name,
+            KeySchema=[{"AttributeName": self.pk_name, "KeyType": "HASH"}],
+            AttributeDefinitions=[{"AttributeName": self.pk_name, 
"AttributeType": "S"}],
+            ProvisionedThroughput={"ReadCapacityUnits": 10, 
"WriteCapacityUnits": 10},
+        )
+
+        assert not self.sensor.poke(None)
+
+        items = [{self.pk_name: self.pk_value, self.attribute_name: 
self.attribute_value[1]}]
+        hook.write_batch_data(items)
+
+        assert self.sensor.poke(None)
+
+    @mock_dynamodb
+    def test_sensor_with_pk_and_sk(self):
+        hook = DynamoDBHook(table_name=self.table_name, 
table_keys=[self.pk_name, self.sk_name])
+
+        hook.conn.create_table(
+            TableName=self.table_name,
+            KeySchema=[
+                {"AttributeName": self.pk_name, "KeyType": "HASH"},
+                {"AttributeName": self.sk_name, "KeyType": "RANGE"},
+            ],
+            AttributeDefinitions=[
+                {"AttributeName": self.pk_name, "AttributeType": "S"},
+                {"AttributeName": self.sk_name, "AttributeType": "S"},
+            ],
+            ProvisionedThroughput={"ReadCapacityUnits": 10, 
"WriteCapacityUnits": 10},
+        )
+
+        assert not self.sensor.poke(None)
+
+        items = [
+            {
+                self.pk_name: self.pk_value,
+                self.sk_name: self.sk_value,
+                self.attribute_name: self.attribute_value[1],
+            }
+        ]
+        hook.write_batch_data(items)
+
+        assert self.sensor.poke(None)
diff --git a/tests/system/providers/amazon/aws/example_dynamodb.py 
b/tests/system/providers/amazon/aws/example_dynamodb.py
index 8316cab0e6..6c3d770c9e 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb.py
@@ -92,12 +92,26 @@ with DAG(
     )
     # [END howto_sensor_dynamodb_value]
 
+    # [START howto_sensor_dynamodb_any_value]
+    dynamodb_sensor_any_value = DynamoDBValueSensor(
+        task_id="waiting_for_dynamodb_item_any_value",
+        table_name=table_name,
+        partition_key_name=PK_ATTRIBUTE_NAME,
+        partition_key_value="Test",
+        sort_key_name=SK_ATTRIBUTE_NAME,
+        sort_key_value="2022-07-12T11:11:25-0400",
+        attribute_name="Value",
+        attribute_value=["Foo", "Testing", "Bar"],
+    )
+    # [END howto_sensor_dynamodb_any_value]
+
     chain(
         # TEST SETUP
         test_context,
         create_table,
         # TEST BODY
         dynamodb_sensor,
+        dynamodb_sensor_any_value,
         # TEST TEARDOWN
         delete_table,
     )

Reply via email to