This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b241577c9c Use base aws classes in Amazon DynamoDB Sensors (#36770)
b241577c9c is described below
commit b241577c9c2ef330709d66968e7bdede63c5ffbe
Author: Andrey Anshin <[email protected]>
AuthorDate: Sun Jan 14 23:56:07 2024 +0400
Use base aws classes in Amazon DynamoDB Sensors (#36770)
---
airflow/providers/amazon/aws/sensors/dynamodb.py | 30 ++++++++---------
.../operators/dynamodb.rst | 4 +++
.../providers/amazon/aws/sensors/test_dynamodb.py | 39 ++++++++++++++++++++++
3 files changed, 58 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/amazon/aws/sensors/dynamodb.py
b/airflow/providers/amazon/aws/sensors/dynamodb.py
index 87a4faf1ab..dbb7f97304 100644
--- a/airflow/providers/amazon/aws/sensors/dynamodb.py
+++ b/airflow/providers/amazon/aws/sensors/dynamodb.py
@@ -16,17 +16,17 @@
# under the License.
from __future__ import annotations
-from functools import cached_property
from typing import TYPE_CHECKING, Any, Iterable, Sequence
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
-from airflow.sensors.base import BaseSensorOperator
+from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
+from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
if TYPE_CHECKING:
from airflow.utils.context import Context
-class DynamoDBValueSensor(BaseSensorOperator):
+class DynamoDBValueSensor(AwsBaseSensor[DynamoDBHook]):
"""
Waits for an attribute value to be present for an item in a DynamoDB table.
@@ -41,11 +41,20 @@ class DynamoDBValueSensor(BaseSensorOperator):
:param attribute_value: DynamoDB attribute value
:param sort_key_name: (optional) DynamoDB sort key name
:param sort_key_value: (optional) DynamoDB sort key value
- :param aws_conn_id: aws connection to use
- :param region_name: aws region to use
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether or not to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
"""
- template_fields: Sequence[str] = (
+ aws_hook_class = DynamoDBHook
+ template_fields: Sequence[str] = aws_template_fields(
"table_name",
"partition_key_name",
"partition_key_value",
@@ -64,8 +73,6 @@ class DynamoDBValueSensor(BaseSensorOperator):
attribute_value: str | Iterable[str],
sort_key_name: str | None = None,
sort_key_value: str | None = None,
- aws_conn_id: str | None = DynamoDBHook.default_conn_name,
- region_name: str | None = None,
**kwargs: Any,
):
super().__init__(**kwargs)
@@ -76,8 +83,6 @@ class DynamoDBValueSensor(BaseSensorOperator):
self.attribute_value = attribute_value
self.sort_key_name = sort_key_name
self.sort_key_value = sort_key_value
- self.aws_conn_id = aws_conn_id
- self.region_name = region_name
def poke(self, context: Context) -> bool:
"""Test DynamoDB item for matching attribute value."""
@@ -108,8 +113,3 @@ class DynamoDBValueSensor(BaseSensorOperator):
)
except KeyError:
return False
-
- @cached_property
- def hook(self) -> DynamoDBHook:
- """Create and return a DynamoDBHook."""
- return DynamoDBHook(self.aws_conn_id, region_name=self.region_name)
diff --git a/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
index 42d41bc864..d899f0de92 100644
--- a/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
+++ b/docs/apache-airflow-providers-amazon/operators/dynamodb.rst
@@ -30,6 +30,10 @@ Prerequisite Tasks
.. include:: ../_partials/prerequisite_tasks.rst
+Generic Parameters
+------------------
+
+.. include:: ../_partials/generic_parameters.rst
Sensors
-------
diff --git a/tests/providers/amazon/aws/sensors/test_dynamodb.py
b/tests/providers/amazon/aws/sensors/test_dynamodb.py
index 89d2b29b20..1508354657 100644
--- a/tests/providers/amazon/aws/sensors/test_dynamodb.py
+++ b/tests/providers/amazon/aws/sensors/test_dynamodb.py
@@ -117,6 +117,45 @@ class TestDynamoDBMultipleValuesSensor:
sort_key_value=self.sk_value,
)
+ def test_init(self):
+ sensor = DynamoDBValueSensor(
+ task_id="dynamodb_value_sensor_init",
+ table_name=self.table_name,
+ partition_key_name=self.pk_name,
+ partition_key_value=self.pk_value,
+ attribute_name=self.attribute_name,
+ attribute_value=self.attribute_value,
+ sort_key_name=self.sk_name,
+ sort_key_value=self.sk_value,
+ # Generic hooks parameters
+ aws_conn_id="fake-conn-id",
+ region_name="cn-north-1",
+ verify=False,
+ botocore_config={"read_timeout": 42},
+ )
+ assert sensor.hook.client_type is None
+ assert sensor.hook.resource_type == "dynamodb"
+ assert sensor.hook.aws_conn_id == "fake-conn-id"
+ assert sensor.hook._region_name == "cn-north-1"
+ assert sensor.hook._verify is False
+ assert sensor.hook._config is not None
+ assert sensor.hook._config.read_timeout == 42
+
+ sensor = DynamoDBValueSensor(
+ task_id="dynamodb_value_sensor_init",
+ table_name=self.table_name,
+ partition_key_name=self.pk_name,
+ partition_key_value=self.pk_value,
+ attribute_name=self.attribute_name,
+ attribute_value=self.attribute_value,
+ sort_key_name=self.sk_name,
+ sort_key_value=self.sk_value,
+ )
+ assert sensor.hook.aws_conn_id == "aws_default"
+ assert sensor.hook._region_name is None
+ assert sensor.hook._verify is None
+ assert sensor.hook._config is None
+
@mock_dynamodb
def test_sensor_with_pk(self):
hook = DynamoDBHook(table_name=self.table_name,
table_keys=[self.pk_name])