This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9059f72668 Enhance `attribute_value` in `DynamoDBValueSensor` to
accept list (#35831)
9059f72668 is described below
commit 9059f72668fb85253b8b4e3e9fb5350d621b639d
Author: Felipe Lolas <[email protected]>
AuthorDate: Sat Nov 25 05:03:23 2023 -0300
Enhance `attribute_value` in `DynamoDBValueSensor` to accept list (#35831)
---
airflow/providers/amazon/aws/sensors/dynamodb.py | 11 ++--
.../operators/dynamodb.rst | 15 +++++
.../providers/amazon/aws/sensors/test_dynamodb.py | 73 ++++++++++++++++++++++
.../providers/amazon/aws/example_dynamodb.py | 14 +++++
4 files changed, 108 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/amazon/aws/sensors/dynamodb.py
b/airflow/providers/amazon/aws/sensors/dynamodb.py
index bd820572ac..87a4faf1ab 100644
--- a/airflow/providers/amazon/aws/sensors/dynamodb.py
+++ b/airflow/providers/amazon/aws/sensors/dynamodb.py
@@ -17,7 +17,7 @@
from __future__ import annotations
from functools import cached_property
-from typing import TYPE_CHECKING, Any, Sequence
+from typing import TYPE_CHECKING, Any, Iterable, Sequence
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.sensors.base import BaseSensorOperator
@@ -61,7 +61,7 @@ class DynamoDBValueSensor(BaseSensorOperator):
partition_key_name: str,
partition_key_value: str,
attribute_name: str,
- attribute_value: str,
+ attribute_value: str | Iterable[str],
sort_key_name: str | None = None,
sort_key_value: str | None = None,
aws_conn_id: str | None = DynamoDBHook.default_conn_name,
@@ -99,12 +99,13 @@ class DynamoDBValueSensor(BaseSensorOperator):
self.log.info("Key: %s", key)
response = table.get_item(Key=key)
try:
+ item_attribute_value = response["Item"][self.attribute_name]
self.log.info("Response: %s", response)
self.log.info("Want: %s = %s", self.attribute_name,
self.attribute_value)
- self.log.info(
- "Got: {response['Item'][self.attribute_name]} = %s",
response["Item"][self.attribute_name]
+ self.log.info("Got: {response['Item'][self.attribute_name]} = %s",
item_attribute_value)
+ return item_attribute_value in (
+ [self.attribute_value] if isinstance(self.attribute_value,
str) else self.attribute_value
)
- return response["Item"][self.attribute_name] ==
self.attribute_value
except KeyError:
return False
diff --git a/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
index f6df741706..42d41bc864 100644
--- a/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
+++ b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
@@ -42,12 +42,27 @@ Wait on Amazon DynamoDB item attribute value match
Use the
:class:`~airflow.providers.amazon.aws.sensors.dynamodb.DynamoDBValueSensor`
to wait for the presence of a matching DynamoDB item's attribute/value pair.
+Wait for a Single Attribute Value Match:
+----------------------------------------
+
+This example shows how to use ``DynamoDBValueSensor`` to wait for a specific
attribute/value pair in a DynamoDB item.
+
.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_dynamodb.py
:language: python
:start-after: [START howto_sensor_dynamodb_value]
:dedent: 4
:end-before: [END howto_sensor_dynamodb_value]
+Wait for Any Value from a List of Attribute Values:
+---------------------------------------------------
+
+In this example, the sensor waits for the DynamoDB item to have an attribute
that matches any value from a provided list.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_dynamodb.py
+ :language: python
+ :start-after: [START howto_sensor_dynamodb_any_value]
+ :dedent: 4
+ :end-before: [END howto_sensor_dynamodb_any_value]
Reference
---------
diff --git a/tests/providers/amazon/aws/sensors/test_dynamodb.py
b/tests/providers/amazon/aws/sensors/test_dynamodb.py
index 05668f5444..89d2b29b20 100644
--- a/tests/providers/amazon/aws/sensors/test_dynamodb.py
+++ b/tests/providers/amazon/aws/sensors/test_dynamodb.py
@@ -17,11 +17,14 @@
from __future__ import annotations
+import pytest
from moto import mock_dynamodb
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor
+pytestmark = pytest.mark.db_test
+
class TestDynamoDBValueSensor:
def setup_method(self):
@@ -91,3 +94,73 @@ class TestDynamoDBValueSensor:
hook.write_batch_data(items)
assert self.sensor.poke(None)
+
+
+class TestDynamoDBMultipleValuesSensor:
+ def setup_method(self):
+ self.table_name = "test_airflow"
+ self.pk_name = "PK"
+ self.pk_value = "PKTest"
+ self.sk_name = "SK"
+ self.sk_value = "SKTest"
+ self.attribute_name = "Foo"
+ self.attribute_value = ["Bar1", "Bar2", "Bar3"]
+
+ self.sensor = DynamoDBValueSensor(
+ task_id="dynamodb_value_sensor",
+ table_name=self.table_name,
+ partition_key_name=self.pk_name,
+ partition_key_value=self.pk_value,
+ attribute_name=self.attribute_name,
+ attribute_value=self.attribute_value,
+ sort_key_name=self.sk_name,
+ sort_key_value=self.sk_value,
+ )
+
+ @mock_dynamodb
+ def test_sensor_with_pk(self):
+ hook = DynamoDBHook(table_name=self.table_name,
table_keys=[self.pk_name])
+
+ hook.conn.create_table(
+ TableName=self.table_name,
+ KeySchema=[{"AttributeName": self.pk_name, "KeyType": "HASH"}],
+ AttributeDefinitions=[{"AttributeName": self.pk_name,
"AttributeType": "S"}],
+ ProvisionedThroughput={"ReadCapacityUnits": 10,
"WriteCapacityUnits": 10},
+ )
+
+ assert not self.sensor.poke(None)
+
+ items = [{self.pk_name: self.pk_value, self.attribute_name:
self.attribute_value[1]}]
+ hook.write_batch_data(items)
+
+ assert self.sensor.poke(None)
+
+ @mock_dynamodb
+ def test_sensor_with_pk_and_sk(self):
+ hook = DynamoDBHook(table_name=self.table_name,
table_keys=[self.pk_name, self.sk_name])
+
+ hook.conn.create_table(
+ TableName=self.table_name,
+ KeySchema=[
+ {"AttributeName": self.pk_name, "KeyType": "HASH"},
+ {"AttributeName": self.sk_name, "KeyType": "RANGE"},
+ ],
+ AttributeDefinitions=[
+ {"AttributeName": self.pk_name, "AttributeType": "S"},
+ {"AttributeName": self.sk_name, "AttributeType": "S"},
+ ],
+ ProvisionedThroughput={"ReadCapacityUnits": 10,
"WriteCapacityUnits": 10},
+ )
+
+ assert not self.sensor.poke(None)
+
+ items = [
+ {
+ self.pk_name: self.pk_value,
+ self.sk_name: self.sk_value,
+ self.attribute_name: self.attribute_value[1],
+ }
+ ]
+ hook.write_batch_data(items)
+
+ assert self.sensor.poke(None)
diff --git a/tests/system/providers/amazon/aws/example_dynamodb.py
b/tests/system/providers/amazon/aws/example_dynamodb.py
index 8316cab0e6..6c3d770c9e 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb.py
@@ -92,12 +92,26 @@ with DAG(
)
# [END howto_sensor_dynamodb_value]
+ # [START howto_sensor_dynamodb_any_value]
+ dynamodb_sensor_any_value = DynamoDBValueSensor(
+ task_id="waiting_for_dynamodb_item_any_value",
+ table_name=table_name,
+ partition_key_name=PK_ATTRIBUTE_NAME,
+ partition_key_value="Test",
+ sort_key_name=SK_ATTRIBUTE_NAME,
+ sort_key_value="2022-07-12T11:11:25-0400",
+ attribute_name="Value",
+ attribute_value=["Foo", "Testing", "Bar"],
+ )
+ # [END howto_sensor_dynamodb_any_value]
+
chain(
# TEST SETUP
test_context,
create_table,
# TEST BODY
dynamodb_sensor,
+ dynamodb_sensor_any_value,
# TEST TEARDOWN
delete_table,
)