This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b241577c9c Use base aws classes in Amazon DynamoDB Sensors (#36770)
b241577c9c is described below

commit b241577c9c2ef330709d66968e7bdede63c5ffbe
Author: Andrey Anshin <[email protected]>
AuthorDate: Sun Jan 14 23:56:07 2024 +0400

    Use base aws classes in Amazon DynamoDB Sensors (#36770)
---
 airflow/providers/amazon/aws/sensors/dynamodb.py   | 30 ++++++++---------
 .../operators/dynamodb.rst                         |  4 +++
 .../providers/amazon/aws/sensors/test_dynamodb.py  | 39 ++++++++++++++++++++++
 3 files changed, 58 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/amazon/aws/sensors/dynamodb.py 
b/airflow/providers/amazon/aws/sensors/dynamodb.py
index 87a4faf1ab..dbb7f97304 100644
--- a/airflow/providers/amazon/aws/sensors/dynamodb.py
+++ b/airflow/providers/amazon/aws/sensors/dynamodb.py
@@ -16,17 +16,17 @@
 # under the License.
 from __future__ import annotations
 
-from functools import cached_property
 from typing import TYPE_CHECKING, Any, Iterable, Sequence
 
 from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
-from airflow.sensors.base import BaseSensorOperator
+from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
+from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
-class DynamoDBValueSensor(BaseSensorOperator):
+class DynamoDBValueSensor(AwsBaseSensor[DynamoDBHook]):
     """
     Waits for an attribute value to be present for an item in a DynamoDB table.
 
@@ -41,11 +41,20 @@ class DynamoDBValueSensor(BaseSensorOperator):
     :param attribute_value: DynamoDB attribute value
     :param sort_key_name: (optional) DynamoDB sort key name
     :param sort_key_value: (optional) DynamoDB sort key value
-    :param aws_conn_id: aws connection to use
-    :param region_name: aws region to use
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param verify: Whether or not to verify SSL certificates. See:
+        
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+    :param botocore_config: Configuration dictionary (key-values) for botocore 
client. See:
+        
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
     """
 
-    template_fields: Sequence[str] = (
+    aws_hook_class = DynamoDBHook
+    template_fields: Sequence[str] = aws_template_fields(
         "table_name",
         "partition_key_name",
         "partition_key_value",
@@ -64,8 +73,6 @@ class DynamoDBValueSensor(BaseSensorOperator):
         attribute_value: str | Iterable[str],
         sort_key_name: str | None = None,
         sort_key_value: str | None = None,
-        aws_conn_id: str | None = DynamoDBHook.default_conn_name,
-        region_name: str | None = None,
         **kwargs: Any,
     ):
         super().__init__(**kwargs)
@@ -76,8 +83,6 @@ class DynamoDBValueSensor(BaseSensorOperator):
         self.attribute_value = attribute_value
         self.sort_key_name = sort_key_name
         self.sort_key_value = sort_key_value
-        self.aws_conn_id = aws_conn_id
-        self.region_name = region_name
 
     def poke(self, context: Context) -> bool:
         """Test DynamoDB item for matching attribute value."""
@@ -108,8 +113,3 @@ class DynamoDBValueSensor(BaseSensorOperator):
             )
         except KeyError:
             return False
-
-    @cached_property
-    def hook(self) -> DynamoDBHook:
-        """Create and return a DynamoDBHook."""
-        return DynamoDBHook(self.aws_conn_id, region_name=self.region_name)
diff --git a/docs/apache-airflow-providers-amazon/operators/dynamodb.rst 
b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
index 42d41bc864..d899f0de92 100644
--- a/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
+++ b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
@@ -30,6 +30,10 @@ Prerequisite Tasks
 
 .. include:: ../_partials/prerequisite_tasks.rst
 
+Generic Parameters
+------------------
+
+.. include:: ../_partials/generic_parameters.rst
 
 Sensors
 -------
diff --git a/tests/providers/amazon/aws/sensors/test_dynamodb.py 
b/tests/providers/amazon/aws/sensors/test_dynamodb.py
index 89d2b29b20..1508354657 100644
--- a/tests/providers/amazon/aws/sensors/test_dynamodb.py
+++ b/tests/providers/amazon/aws/sensors/test_dynamodb.py
@@ -117,6 +117,45 @@ class TestDynamoDBMultipleValuesSensor:
             sort_key_value=self.sk_value,
         )
 
+    def test_init(self):
+        sensor = DynamoDBValueSensor(
+            task_id="dynamodb_value_sensor_init",
+            table_name=self.table_name,
+            partition_key_name=self.pk_name,
+            partition_key_value=self.pk_value,
+            attribute_name=self.attribute_name,
+            attribute_value=self.attribute_value,
+            sort_key_name=self.sk_name,
+            sort_key_value=self.sk_value,
+            # Generic hooks parameters
+            aws_conn_id="fake-conn-id",
+            region_name="cn-north-1",
+            verify=False,
+            botocore_config={"read_timeout": 42},
+        )
+        assert sensor.hook.client_type is None
+        assert sensor.hook.resource_type == "dynamodb"
+        assert sensor.hook.aws_conn_id == "fake-conn-id"
+        assert sensor.hook._region_name == "cn-north-1"
+        assert sensor.hook._verify is False
+        assert sensor.hook._config is not None
+        assert sensor.hook._config.read_timeout == 42
+
+        sensor = DynamoDBValueSensor(
+            task_id="dynamodb_value_sensor_init",
+            table_name=self.table_name,
+            partition_key_name=self.pk_name,
+            partition_key_value=self.pk_value,
+            attribute_name=self.attribute_name,
+            attribute_value=self.attribute_value,
+            sort_key_name=self.sk_name,
+            sort_key_value=self.sk_value,
+        )
+        assert sensor.hook.aws_conn_id == "aws_default"
+        assert sensor.hook._region_name is None
+        assert sensor.hook._verify is None
+        assert sensor.hook._config is None
+
     @mock_dynamodb
     def test_sensor_with_pk(self):
         hook = DynamoDBHook(table_name=self.table_name, 
table_keys=[self.pk_name])

Reply via email to