This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ea682382bc Adding Glue Data Quality Rule Recommendation Run (#40014)
ea682382bc is described below
commit ea682382bc3570a820d400994114e8b0060add66
Author: GPK <[email protected]>
AuthorDate: Tue Jun 4 18:45:26 2024 +0100
Adding Glue Data Quality Rule Recommendation Run (#40014)
---
airflow/providers/amazon/aws/hooks/glue.py | 23 ++++
airflow/providers/amazon/aws/operators/glue.py | 146 +++++++++++++++++++++
airflow/providers/amazon/aws/sensors/glue.py | 125 +++++++++++++++++-
airflow/providers/amazon/aws/triggers/glue.py | 35 +++++
airflow/providers/amazon/aws/waiters/glue.json | 49 +++++++
.../operators/glue.rst | 28 ++++
tests/providers/amazon/aws/hooks/test_glue.py | 21 +++
tests/providers/amazon/aws/operators/test_glue.py | 108 +++++++++++++++
.../amazon/aws/sensors/test_glue_data_quality.py | 138 ++++++++++++++++++-
tests/providers/amazon/aws/triggers/test_glue.py | 28 ++++
tests/providers/amazon/aws/waiters/test_glue.py | 38 +++++-
.../amazon/aws/example_glue_data_quality.py | 2 +-
...ample_glue_data_quality_with_recommendation.py} | 57 ++++----
13 files changed, 764 insertions(+), 34 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/glue.py
b/airflow/providers/amazon/aws/hooks/glue.py
index f81dde2d11..08a96836da 100644
--- a/airflow/providers/amazon/aws/hooks/glue.py
+++ b/airflow/providers/amazon/aws/hooks/glue.py
@@ -530,3 +530,26 @@ class GlueDataQualityHook(AwsBaseHook):
raise AirflowException(
"AWS Glue data quality ruleset evaluation run failed for one
or more rules"
)
+
+ def log_recommendation_results(self, run_id: str) -> None:
+ """
+ Print the outcome of recommendation run, recommendation run generates
multiple rules against a data source (Glue table) in Data Quality Definition
Language (DQDL) format.
+
+ Rules = [
+ IsComplete "NAME",
+ ColumnLength "EMP_ID" between 1 and 12,
+ IsUnique "EMP_ID",
+ ColumnValues "INCOME" > 50000
+ ]
+ """
+ result =
self.conn.get_data_quality_rule_recommendation_run(RunId=run_id)
+
+ if result.get("RecommendedRuleset"):
+ self.log.info(
+ "AWS Glue data quality recommended rules for DatabaseName: %s
TableName: %s",
+ result["DataSource"]["GlueTable"]["DatabaseName"],
+ result["DataSource"]["GlueTable"]["TableName"],
+ )
+ self.log.info(result["RecommendedRuleset"])
+ else:
+ self.log.info("AWS Glue data quality, no recommended rules
available for RunId: %s", run_id)
diff --git a/airflow/providers/amazon/aws/operators/glue.py
b/airflow/providers/amazon/aws/operators/glue.py
index cd681147bb..90c8f5b30f 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -32,6 +32,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.links.glue import GlueJobRunDetailsLink
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
from airflow.providers.amazon.aws.triggers.glue import (
+ GlueDataQualityRuleRecommendationRunCompleteTrigger,
GlueDataQualityRuleSetEvaluationRunCompleteTrigger,
GlueJobCompleteTrigger,
)
@@ -499,3 +500,148 @@ class
GlueDataQualityRuleSetEvaluationRunOperator(AwsBaseOperator[GlueDataQualit
)
return event["evaluation_run_id"]
+
+
+class
GlueDataQualityRuleRecommendationRunOperator(AwsBaseOperator[GlueDataQualityHook]):
+ """
+ Starts a recommendation run that is used to generate rules, Glue Data
Quality analyzes the data and comes up with recommendations for a potential
ruleset.
+
+ Recommendation runs are automatically deleted after 90 days.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GlueDataQualityRuleRecommendationRunOperator`
+
+ :param datasource: The data source (Glue table) associated with this run.
(templated)
+ :param role: IAM role supplied for job execution. (templated)
+ :param number_of_workers: The number of G.1X workers to be used in the
run. (default: 5)
+ :param timeout: The timeout for a run in minutes. This is the maximum time
that a run can consume resources
+ before it is terminated and enters TIMEOUT status. (default: 2,880)
+ :param show_results: Displays the recommended ruleset (a set of rules),
when recommendation run completes. (default: True)
+ :param recommendation_run_kwargs: Extra arguments for recommendation run.
(templated)
+ :param wait_for_completion: Whether to wait for job to stop. (default:
True)
+ :param waiter_delay: Time in seconds to wait between status checks.
(default: 60)
+ :param waiter_max_attempts: Maximum number of attempts to check for job
completion. (default: 20)
+ :param deferrable: If True, the operator will wait asynchronously for the
job to stop.
+ This implies waiting for completion. This mode requires aiobotocore
module to be installed.
+ (default: False)
+
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether or not to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+ """
+
+ aws_hook_class = GlueDataQualityHook
+ template_fields: Sequence[str] = (
+ "datasource",
+ "role",
+ "recommendation_run_kwargs",
+ )
+
+ template_fields_renderers = {"datasource": "json",
"recommendation_run_kwargs": "json"}
+
+ ui_color = "#ededed"
+
+ def __init__(
+ self,
+ *,
+ datasource: dict,
+ role: str,
+ number_of_workers: int = 5,
+ timeout: int = 2880,
+ show_results: bool = True,
+ recommendation_run_kwargs: dict[str, Any] | None = None,
+ wait_for_completion: bool = True,
+ waiter_delay: int = 60,
+ waiter_max_attempts: int = 20,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ aws_conn_id: str | None = "aws_default",
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.datasource = datasource
+ self.role = role
+ self.number_of_workers = number_of_workers
+ self.timeout = timeout
+ self.show_results = show_results
+ self.recommendation_run_kwargs = recommendation_run_kwargs or {}
+ self.wait_for_completion = wait_for_completion
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
+ self.deferrable = deferrable
+ self.aws_conn_id = aws_conn_id
+
+ def execute(self, context: Context) -> str:
+ glue_table = self.datasource.get("GlueTable", {})
+
+ if not glue_table.get("DatabaseName") or not
glue_table.get("TableName"):
+ raise AttributeError("DataSource glue table must have DatabaseName
and TableName")
+
+ self.log.info("Submitting AWS Glue data quality recommendation run
with %s", self.datasource)
+
+ try:
+ response =
self.hook.conn.start_data_quality_rule_recommendation_run(
+ DataSource=self.datasource,
+ Role=self.role,
+ NumberOfWorkers=self.number_of_workers,
+ Timeout=self.timeout,
+ **self.recommendation_run_kwargs,
+ )
+ except ClientError as error:
+ raise AirflowException(
+ f"AWS Glue data quality recommendation run failed:
{error.response['Error']['Message']}"
+ )
+
+ recommendation_run_id = response["RunId"]
+
+ message_description = (
+ f"AWS Glue data quality recommendation run RunId:
{recommendation_run_id} to complete."
+ )
+ if self.deferrable:
+ self.log.info("Deferring %s", message_description)
+ self.defer(
+ trigger=GlueDataQualityRuleRecommendationRunCompleteTrigger(
+ recommendation_run_id=recommendation_run_id,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ ),
+ method_name="execute_complete",
+ )
+
+ elif self.wait_for_completion:
+ self.log.info("Waiting for %s", message_description)
+
+
self.hook.get_waiter("data_quality_rule_recommendation_run_complete").wait(
+ RunId=recommendation_run_id,
+ WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts":
self.waiter_max_attempts},
+ )
+ self.log.info(
+ "AWS Glue data quality recommendation run completed RunId:
%s", recommendation_run_id
+ )
+
+ if self.show_results:
+
self.hook.log_recommendation_results(run_id=recommendation_run_id)
+
+ else:
+ self.log.info(message_description)
+
+ return recommendation_run_id
+
+ def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> str:
+ event = validate_execute_complete_event(event)
+
+ if event["status"] != "success":
+ raise AirflowException(f"Error: AWS Glue data quality rule
recommendation run: {event}")
+
+ if self.show_results:
+
self.hook.log_recommendation_results(run_id=event["recommendation_run_id"])
+
+ return event["recommendation_run_id"]
diff --git a/airflow/providers/amazon/aws/sensors/glue.py
b/airflow/providers/amazon/aws/sensors/glue.py
index 76d6cb9d94..8493c2fd4a 100644
--- a/airflow/providers/amazon/aws/sensors/glue.py
+++ b/airflow/providers/amazon/aws/sensors/glue.py
@@ -24,7 +24,10 @@ from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook,
GlueJobHook
from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
-from airflow.providers.amazon.aws.triggers.glue import
GlueDataQualityRuleSetEvaluationRunCompleteTrigger
+from airflow.providers.amazon.aws.triggers.glue import (
+ GlueDataQualityRuleRecommendationRunCompleteTrigger,
+ GlueDataQualityRuleSetEvaluationRunCompleteTrigger,
+)
from airflow.providers.amazon.aws.utils import validate_execute_complete_event
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
from airflow.sensors.base import BaseSensorOperator
@@ -226,3 +229,123 @@ class
GlueDataQualityRuleSetEvaluationRunSensor(AwsBaseSensor[GlueDataQualityHoo
raise AirflowException(job_error_message)
else:
return False
+
+
+class
GlueDataQualityRuleRecommendationRunSensor(AwsBaseSensor[GlueDataQualityHook]):
+ """
+ Waits for an AWS Glue data quality recommendation run to reach any of the
status below.
+
+ 'FAILED', 'STOPPED', 'STOPPING', 'TIMEOUT', 'SUCCEEDED'
+
+ .. seealso::
+ For more information on how to use this sensor, take a look at the
guide:
+ :ref:`howto/sensor:GlueDataQualityRuleRecommendationRunSensor`
+
+ :param recommendation_run_id: The AWS Glue data quality rule
recommendation run identifier.
+ :param show_results: Displays the recommended ruleset (a set of rules),
when recommendation run completes. (default: True)
+ :param deferrable: If True, the sensor will operate in deferrable mode.
This mode requires aiobotocore
+ module to be installed.
+ (default: False, but can be overridden in config file by setting
default_deferrable to True)
+ :param poke_interval: Polling period in seconds to check for the status of
the job. (default: 120)
+ :param max_retries: Number of times before returning the current state.
(default: 60)
+
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+ """
+
+ SUCCESS_STATES = ("SUCCEEDED",)
+
+ FAILURE_STATES = ("FAILED", "STOPPED", "STOPPING", "TIMEOUT")
+
+ aws_hook_class = GlueDataQualityHook
+ template_fields: Sequence[str] =
aws_template_fields("recommendation_run_id")
+
+ def __init__(
+ self,
+ *,
+ recommendation_run_id: str,
+ show_results: bool = True,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ poke_interval: int = 120,
+ max_retries: int = 60,
+ aws_conn_id: str | None = "aws_default",
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.recommendation_run_id = recommendation_run_id
+ self.show_results = show_results
+ self.deferrable = deferrable
+ self.poke_interval = poke_interval
+ self.max_retries = max_retries
+ self.aws_conn_id = aws_conn_id
+
+ def execute(self, context: Context) -> Any:
+ if self.deferrable:
+ self.defer(
+ trigger=GlueDataQualityRuleRecommendationRunCompleteTrigger(
+ recommendation_run_id=self.recommendation_run_id,
+ waiter_delay=int(self.poke_interval),
+ waiter_max_attempts=self.max_retries,
+ aws_conn_id=self.aws_conn_id,
+ ),
+ method_name="execute_complete",
+ )
+ else:
+ super().execute(context=context)
+
+ def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> None:
+ event = validate_execute_complete_event(event)
+
+ if event["status"] != "success":
+ message = f"Error: AWS Glue data quality recommendation run:
{event}"
+ if self.soft_fail:
+ raise AirflowSkipException(message)
+ raise AirflowException(message)
+
+ if self.show_results:
+
self.hook.log_recommendation_results(run_id=self.recommendation_run_id)
+
+ self.log.info("AWS Glue data quality recommendation run completed.")
+
+ def poke(self, context: Context) -> bool:
+ self.log.info(
+ "Poking for AWS Glue data quality recommendation run RunId: %s",
self.recommendation_run_id
+ )
+
+ response =
self.hook.conn.get_data_quality_rule_recommendation_run(RunId=self.recommendation_run_id)
+
+ status = response.get("Status")
+
+ if status in self.SUCCESS_STATES:
+ if self.show_results:
+
self.hook.log_recommendation_results(run_id=self.recommendation_run_id)
+
+ self.log.info(
+ "AWS Glue data quality recommendation run completed RunId: %s
Run State: %s",
+ self.recommendation_run_id,
+ response["Status"],
+ )
+
+ return True
+
+ elif status in self.FAILURE_STATES:
+ job_error_message = (
+ f"Error: AWS Glue data quality recommendation run RunId:
{self.recommendation_run_id} Run "
+ f"Status: {status}"
+ f": {response.get('ErrorString')}"
+ )
+ self.log.info(job_error_message)
+ # TODO: remove this if block when min_airflow_version is set to
higher than 2.7.1
+ if self.soft_fail:
+ raise AirflowSkipException(job_error_message)
+ raise AirflowException(job_error_message)
+ else:
+ return False
diff --git a/airflow/providers/amazon/aws/triggers/glue.py
b/airflow/providers/amazon/aws/triggers/glue.py
index 1411955752..1af5060176 100644
--- a/airflow/providers/amazon/aws/triggers/glue.py
+++ b/airflow/providers/amazon/aws/triggers/glue.py
@@ -187,3 +187,38 @@ class
GlueDataQualityRuleSetEvaluationRunCompleteTrigger(AwsBaseWaiterTrigger):
def hook(self) -> AwsGenericHook:
return GlueDataQualityHook(aws_conn_id=self.aws_conn_id)
+
+
+class
GlueDataQualityRuleRecommendationRunCompleteTrigger(AwsBaseWaiterTrigger):
+ """
+ Trigger when a AWS Glue data quality recommendation run complete.
+
+ :param recommendation_run_id: The AWS Glue data quality rule
recommendation run identifier.
+ :param waiter_delay: The amount of time in seconds to wait between
attempts. (default: 60)
+ :param waiter_max_attempts: The maximum number of attempts to be made.
(default: 75)
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ """
+
+ def __init__(
+ self,
+ recommendation_run_id: str,
+ waiter_delay: int = 60,
+ waiter_max_attempts: int = 75,
+ aws_conn_id: str | None = "aws_default",
+ ):
+ super().__init__(
+ serialized_fields={"recommendation_run_id": recommendation_run_id},
+ waiter_name="data_quality_rule_recommendation_run_complete",
+ waiter_args={"RunId": recommendation_run_id},
+ failure_message="AWS Glue data quality recommendation run failed.",
+ status_message="Status of AWS Glue data quality recommendation run
is",
+ status_queries=["Status"],
+ return_key="recommendation_run_id",
+ return_value=recommendation_run_id,
+ waiter_delay=waiter_delay,
+ waiter_max_attempts=waiter_max_attempts,
+ aws_conn_id=aws_conn_id,
+ )
+
+ def hook(self) -> AwsGenericHook:
+ return GlueDataQualityHook(aws_conn_id=self.aws_conn_id)
diff --git a/airflow/providers/amazon/aws/waiters/glue.json
b/airflow/providers/amazon/aws/waiters/glue.json
index f9a2e4f133..2fb355809d 100644
--- a/airflow/providers/amazon/aws/waiters/glue.json
+++ b/airflow/providers/amazon/aws/waiters/glue.json
@@ -74,6 +74,55 @@
"state": "success"
}
]
+ },
+ "data_quality_rule_recommendation_run_complete": {
+ "operation": "GetDataQualityRuleRecommendationRun",
+ "delay": 60,
+ "maxAttempts": 75,
+ "acceptors": [
+ {
+ "matcher": "path",
+ "argument": "Status",
+ "expected": "STARTING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "Status",
+ "expected": "RUNNING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "Status",
+ "expected": "STOPPING",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "Status",
+ "expected": "STOPPED",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "Status",
+ "expected": "FAILED",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "Status",
+ "expected": "TIMEOUT",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "Status",
+ "expected": "SUCCEEDED",
+ "state": "success"
+ }
+ ]
}
}
}
diff --git a/docs/apache-airflow-providers-amazon/operators/glue.rst
b/docs/apache-airflow-providers-amazon/operators/glue.rst
index c53e84ccd3..024d6708b6 100644
--- a/docs/apache-airflow-providers-amazon/operators/glue.rst
+++ b/docs/apache-airflow-providers-amazon/operators/glue.rst
@@ -99,6 +99,20 @@ To start a AWS Glue Data Quality ruleset evaluation run you
can use
:start-after: [START
howto_operator_glue_data_quality_ruleset_evaluation_run_operator]
:end-before: [END
howto_operator_glue_data_quality_ruleset_evaluation_run_operator]
+.. _howto/operator:GlueDataQualityRuleRecommendationRunOperator:
+
+Start a AWS Glue Data Quality Recommendation Run
+=================================================
+
+To start a AWS Glue Data Quality rule recommendation run you can use
+:class:`~airflow.providers.amazon.aws.operators.glue.GlueDataQualityRuleRecommendationRunOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_glue_data_quality_with_recommendation.py
+ :language: python
+ :dedent: 4
+ :start-after: [START
howto_operator_glue_data_quality_rule_recommendation_run]
+ :end-before: [END howto_operator_glue_data_quality_rule_recommendation_run]
+
Sensors
-------
@@ -144,6 +158,20 @@ reaches a terminal state you can use
:class:`~airflow.providers.amazon.aws.senso
:start-after: [START howto_sensor_glue_data_quality_ruleset_evaluation_run]
:end-before: [END howto_sensor_glue_data_quality_ruleset_evaluation_run]
+.. _howto/sensor:GlueDataQualityRuleRecommendationRunSensor:
+
+Wait on an AWS Glue Data Quality Recommendation Run
+====================================================
+
+To wait on the state of an AWS Glue Data Quality recommendation run until it
+reaches a terminal state you can use
:class:`~airflow.providers.amazon.aws.sensors.glue.GlueDataQualityRuleRecommendationRunSensor`
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_glue_data_quality_with_recommendation.py
+ :language: python
+ :dedent: 4
+ :start-after: [START
howto_sensor_glue_data_quality_rule_recommendation_run]
+ :end-before: [END howto_sensor_glue_data_quality_rule_recommendation_run]
+
Reference
---------
diff --git a/tests/providers/amazon/aws/hooks/test_glue.py
b/tests/providers/amazon/aws/hooks/test_glue.py
index cdcef07d9a..de1de82273 100644
--- a/tests/providers/amazon/aws/hooks/test_glue.py
+++ b/tests/providers/amazon/aws/hooks/test_glue.py
@@ -617,3 +617,24 @@ class TestGlueDataQualityHook:
assert caplog.messages == [
"AWS Glue data quality ruleset evaluation run, total number of
rules failed: 1"
]
+
+ @mock.patch.object(GlueDataQualityHook, "conn")
+ def test_log_recommendation_results(self,
glue_data_quality_hook_mock_conn, caplog):
+ rules = """ Rules = [
+ RowCount between 2 and 8,
+ IsComplete "name"
+ ]
+ """
+
glue_data_quality_hook_mock_conn.get_data_quality_rule_recommendation_run.return_value
= {
+ "RunId": self.RUN_ID,
+ "DataSource": {"GlueTable": {"DatabaseName": "TestDB",
"TableName": "TestTable"}},
+ "RecommendedRuleset": rules,
+ }
+
+ with caplog.at_level(logging.INFO, logger=self.glue.log.name):
+ self.glue.log_recommendation_results(run_id=self.RUN_ID)
+
+
glue_data_quality_hook_mock_conn.get_data_quality_rule_recommendation_run.assert_called_once_with(
+ RunId=self.RUN_ID
+ )
+ assert rules in caplog.messages
diff --git a/tests/providers/amazon/aws/operators/test_glue.py
b/tests/providers/amazon/aws/operators/test_glue.py
index e5beef2bb6..6dbc5de1ce 100644
--- a/tests/providers/amazon/aws/operators/test_glue.py
+++ b/tests/providers/amazon/aws/operators/test_glue.py
@@ -19,6 +19,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Generator
from unittest import mock
+import boto3
import pytest
from boto3 import client
from moto import mock_aws
@@ -29,6 +30,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.links.glue import GlueJobRunDetailsLink
from airflow.providers.amazon.aws.operators.glue import (
GlueDataQualityOperator,
+ GlueDataQualityRuleRecommendationRunOperator,
GlueDataQualityRuleSetEvaluationRunOperator,
GlueJobOperator,
)
@@ -533,3 +535,109 @@ class TestGlueDataQualityRuleSetEvaluationRunOperator:
assert response == self.RUN_ID
assert glue_data_quality_hook.get_waiter.call_count ==
wait_for_completion
assert self.operator.defer.call_count == deferrable
+
+
+class TestGlueDataQualityRuleRecommendationRunOperator:
+ RUN_ID = "1234567890"
+ DATA_SOURCE = {"GlueTable": {"DatabaseName": "TestDB", "TableName":
"TestTable"}}
+ ROLE = "role_arn"
+
+ @pytest.fixture
+ def mock_conn(self) -> Generator[BaseAwsConnection, None, None]:
+ with mock.patch.object(GlueDataQualityHook, "conn") as _conn:
+ _conn.start_data_quality_rule_recommendation_run.return_value =
{"RunId": self.RUN_ID}
+ yield _conn
+
+ @pytest.fixture
+ def glue_data_quality_hook(self) -> Generator[GlueDataQualityHook, None,
None]:
+ with mock_aws():
+ hook = GlueDataQualityHook(aws_conn_id="aws_default")
+ yield hook
+
+ def setup_method(self):
+ self.operator = GlueDataQualityRuleRecommendationRunOperator(
+ task_id="start_recommendation_run",
+ datasource=self.DATA_SOURCE,
+ role=self.ROLE,
+ show_results=False,
+ recommendation_run_kwargs={"CreatedRulesetName": "test-ruleset"},
+ )
+ self.operator.defer = mock.MagicMock()
+
+ def test_init(self):
+ assert self.operator.datasource == self.DATA_SOURCE
+ assert self.operator.role == self.ROLE
+ assert self.operator.show_results is False
+ assert self.operator.recommendation_run_kwargs ==
{"CreatedRulesetName": "test-ruleset"}
+
+ @mock.patch.object(GlueDataQualityHook, "conn")
+ def test_start_data_quality_rule_recommendation_run(self,
glue_data_quality_mock_conn):
+ self.op = GlueDataQualityRuleRecommendationRunOperator(
+ task_id="start_recommendation_run",
+ datasource=self.DATA_SOURCE,
+ role=self.ROLE,
+ number_of_workers=10,
+ timeout=1000,
+ recommendation_run_kwargs={"CreatedRulesetName": "test-ruleset"},
+ )
+
+ self.op.wait_for_completion = False
+ self.op.execute({})
+
+
glue_data_quality_mock_conn.start_data_quality_rule_recommendation_run.assert_called_once_with(
+ DataSource=self.DATA_SOURCE,
+ Role=self.ROLE,
+ NumberOfWorkers=10,
+ Timeout=1000,
+ CreatedRulesetName="test-ruleset",
+ )
+
+ @mock.patch.object(GlueDataQualityHook, "conn")
+ def test_start_data_quality_rule_recommendation_run_failed(self,
glue_data_quality_mock_conn):
+ created_ruleset_name = "test-ruleset"
+ error_message = f"Ruleset {created_ruleset_name} already exists"
+
+ err_response = {"Error": {"Code": "InvalidInputException", "Message":
error_message}}
+
+ exception = boto3.client("glue").exceptions.ClientError(
+ err_response, "StartDataQualityRuleRecommendationRun"
+ )
+ returned_exception = type(exception)
+
+ glue_data_quality_mock_conn.exceptions.InvalidInputException =
returned_exception
+
glue_data_quality_mock_conn.start_data_quality_rule_recommendation_run.side_effect
= exception
+
+ operator = GlueDataQualityRuleRecommendationRunOperator(
+ task_id="stat_recommendation_run",
+ datasource=self.DATA_SOURCE,
+ role=self.ROLE,
+ recommendation_run_kwargs={"CreatedRulesetName":
created_ruleset_name},
+ )
+ operator.wait_for_completion = False
+
+ with pytest.raises(
+ AirflowException,
+ match=f"AWS Glue data quality recommendation run failed: Ruleset
{created_ruleset_name} already exists",
+ ):
+ operator.execute({})
+
+ @pytest.mark.parametrize(
+ "wait_for_completion, deferrable",
+ [
+ pytest.param(False, False, id="no_wait"),
+ pytest.param(True, False, id="wait"),
+ pytest.param(False, True, id="defer"),
+ ],
+ )
+ @mock.patch.object(GlueDataQualityHook, "get_waiter")
+ def test_start_data_quality_rule_recommendation_run_wait_combinations(
+ self, _, wait_for_completion, deferrable, mock_conn,
glue_data_quality_hook
+ ):
+ self.operator.wait_for_completion = wait_for_completion
+ self.operator.deferrable = deferrable
+
+ response = self.operator.execute({})
+
+ assert response == self.RUN_ID
+ assert glue_data_quality_hook.get_waiter.call_count ==
wait_for_completion
+ assert self.operator.defer.call_count == deferrable
diff --git a/tests/providers/amazon/aws/sensors/test_glue_data_quality.py
b/tests/providers/amazon/aws/sensors/test_glue_data_quality.py
index 0051b49b62..a37bc0b700 100644
--- a/tests/providers/amazon/aws/sensors/test_glue_data_quality.py
+++ b/tests/providers/amazon/aws/sensors/test_glue_data_quality.py
@@ -22,7 +22,10 @@ import pytest
from airflow.exceptions import AirflowException, AirflowSkipException,
TaskDeferred
from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook
-from airflow.providers.amazon.aws.sensors.glue import
GlueDataQualityRuleSetEvaluationRunSensor
+from airflow.providers.amazon.aws.sensors.glue import (
+ GlueDataQualityRuleRecommendationRunSensor,
+ GlueDataQualityRuleSetEvaluationRunSensor,
+)
SAMPLE_RESPONSE_GET_DATA_QUALITY_EVALUATION_RUN_SUCCEEDED = {
"RunId": "12345",
@@ -65,6 +68,22 @@ SAMPLE_RESPONSE_GET_DATA_QUALITY_RESULT = {
],
}
+RULES = """
+ Rules = [
+ RowCount between 2 and 8,
+ IsComplete "name"
+ ]
+ """
+
+SAMPLE_RESPONSE_GET_DATA_RULE_RECOMMENDATION_RUN_SUCCEEDED = {
+ "RunId": "12345",
+ "Status": "SUCCEEDED",
+ "DataSource": {"GlueTable": {"DatabaseName": "TestDB", "TableName":
"TestTable"}},
+ "RecommendedRuleset": RULES,
+}
+
+SAMPLE_RESPONSE_DATA_RULE_RECOMMENDATION_RUN_RUNNING = {"RunId": "12345",
"Status": "RUNNING"}
+
class TestGlueDataQualityRuleSetEvaluationRunSensor:
SENSOR = GlueDataQualityRuleSetEvaluationRunSensor
@@ -180,3 +199,120 @@ class TestGlueDataQualityRuleSetEvaluationRunSensor:
event = {"status": "failure"}
with pytest.raises(AirflowException):
op.execute_complete(context={}, event=event)
+
+
+class TestGlueDataQualityRuleRecommendationRunSensor:
+ SENSOR = GlueDataQualityRuleRecommendationRunSensor
+
+ def setup_method(self):
+ self.default_args = dict(
+ task_id="test_data_quality_rule_recommendation_sensor",
+ recommendation_run_id="12345",
+ poke_interval=5,
+ max_retries=0,
+ )
+ self.sensor = self.SENSOR(**self.default_args, aws_conn_id=None)
+
+ def test_base_aws_op_attributes(self):
+ op = self.SENSOR(**self.default_args)
+ assert op.hook.aws_conn_id == "aws_default"
+ assert op.hook._region_name is None
+ assert op.hook._verify is None
+ assert op.hook._config is None
+
+ op = self.SENSOR(
+ **self.default_args,
+ aws_conn_id="aws-test-custom-conn",
+ region_name="eu-west-1",
+ verify=False,
+ botocore_config={"read_timeout": 42},
+ )
+ assert op.hook.aws_conn_id == "aws-test-custom-conn"
+ assert op.hook._region_name == "eu-west-1"
+ assert op.hook._verify is False
+ assert op.hook._config is not None
+ assert op.hook._config.read_timeout == 42
+
+ @mock.patch.object(GlueDataQualityHook, "conn")
+ def test_poke_success_state(self, mock_conn):
+ mock_conn.get_data_quality_rule_recommendation_run.return_value = (
+ SAMPLE_RESPONSE_GET_DATA_RULE_RECOMMENDATION_RUN_SUCCEEDED
+ )
+ assert self.sensor.poke({}) is True
+
+ @mock.patch.object(GlueDataQualityHook, "conn")
+ def test_poke_intermediate_state(self, mock_conn):
+ mock_conn.get_data_quality_rule_recommendation_run.return_value = (
+ SAMPLE_RESPONSE_DATA_RULE_RECOMMENDATION_RUN_RUNNING
+ )
+ assert self.sensor.poke({}) is False
+
+ @pytest.mark.parametrize(
+ "soft_fail, expected_exception",
+ [
+ pytest.param(False, AirflowException, id="not-soft-fail"),
+ pytest.param(True, AirflowSkipException, id="soft-fail"),
+ ],
+ )
+ @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
+ @mock.patch.object(GlueDataQualityHook, "conn")
+ def test_poke_failure_states(self, mock_conn, state, soft_fail,
expected_exception):
+ mock_conn.get_data_quality_rule_recommendation_run.return_value = {
+ "RunId": "12345",
+ "Status": state,
+ "ErrorString": "unknown error",
+ }
+
+ sensor = self.SENSOR(**self.default_args, aws_conn_id=None,
soft_fail=soft_fail)
+
+ message = (
+ f"Error: AWS Glue data quality recommendation run RunId: 12345 Run
Status: {state}: unknown error"
+ )
+
+ with pytest.raises(expected_exception, match=message):
+ sensor.poke({})
+
+
mock_conn.get_data_quality_rule_recommendation_run.assert_called_once_with(RunId="12345")
+
+ def test_sensor_defer(self):
+ """Test the execute method raise TaskDeferred if running sensor in
deferrable mode"""
+ sensor = GlueDataQualityRuleRecommendationRunSensor(
+ task_id="test_task",
+ poke_interval=0,
+ recommendation_run_id="12345",
+ aws_conn_id="aws_default",
+ deferrable=True,
+ )
+
+ with pytest.raises(TaskDeferred):
+ sensor.execute(context=None)
+
+ @mock.patch.object(GlueDataQualityHook, "conn")
+ def test_execute_complete_succeeds_if_status_in_succeeded_states(self,
mock_conn, caplog):
+ mock_conn.get_data_quality_rule_recommendation_run.return_value = (
+ SAMPLE_RESPONSE_GET_DATA_RULE_RECOMMENDATION_RUN_SUCCEEDED
+ )
+
+ op = GlueDataQualityRuleRecommendationRunSensor(
+ task_id="test_data_quality_rule_recommendation_run_sensor",
+ recommendation_run_id="12345",
+ poke_interval=0,
+ aws_conn_id="aws_default",
+ deferrable=True,
+ )
+ event = {"status": "success", "recommendation_run_id": "12345"}
+ op.execute_complete(context={}, event=event)
+ assert "AWS Glue data quality recommendation run completed." in
caplog.messages
+
+ def test_execute_complete_fails_if_status_in_failure_states(self):
+ op = GlueDataQualityRuleRecommendationRunSensor(
+ task_id="test_data_quality_rule_recommendation_run_sensor",
+ recommendation_run_id="12345",
+ poke_interval=0,
+ aws_conn_id="aws_default",
+ deferrable=True,
+ )
+ event = {"status": "failure"}
+
+ with pytest.raises(AirflowException):
+ op.execute_complete(context={}, event=event)
diff --git a/tests/providers/amazon/aws/triggers/test_glue.py
b/tests/providers/amazon/aws/triggers/test_glue.py
index 79dc3f5d2c..e05a772b7c 100644
--- a/tests/providers/amazon/aws/triggers/test_glue.py
+++ b/tests/providers/amazon/aws/triggers/test_glue.py
@@ -27,6 +27,7 @@ from airflow.providers.amazon.aws.hooks.glue import
GlueDataQualityHook, GlueJob
from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
from airflow.providers.amazon.aws.triggers.glue import (
GlueCatalogPartitionTrigger,
+ GlueDataQualityRuleRecommendationRunCompleteTrigger,
GlueDataQualityRuleSetEvaluationRunCompleteTrigger,
GlueJobCompleteTrigger,
)
@@ -124,3 +125,30 @@ class TestGlueDataQualityEvaluationRunCompletedTrigger:
assert response == TriggerEvent({"status": "success",
"evaluation_run_id": self.RUN_ID})
assert_expected_waiter_type(mock_get_waiter, self.EXPECTED_WAITER_NAME)
mock_get_waiter().wait.assert_called_once()
+
+
+class TestGlueDataQualityRuleRecommendationRunCompleteTrigger:
+ EXPECTED_WAITER_NAME = "data_quality_rule_recommendation_run_complete"
+ RUN_ID = "1234567890abc"
+
+ def test_serialization(self):
+ """Assert that arguments and classpath are correctly serialized."""
+ trigger =
GlueDataQualityRuleRecommendationRunCompleteTrigger(recommendation_run_id=self.RUN_ID)
+ classpath, kwargs = trigger.serialize()
+ assert classpath == BASE_TRIGGER_CLASSPATH +
"GlueDataQualityRuleRecommendationRunCompleteTrigger"
+ assert kwargs.get("recommendation_run_id") == self.RUN_ID
+
+ @pytest.mark.asyncio
+ @mock.patch.object(GlueDataQualityHook, "get_waiter")
+ @mock.patch.object(GlueDataQualityHook, "async_conn")
+ async def test_run_success(self, mock_async_conn, mock_get_waiter):
+ mock_async_conn.__aenter__.return_value = mock.MagicMock()
+ mock_get_waiter().wait = AsyncMock()
+ trigger =
GlueDataQualityRuleRecommendationRunCompleteTrigger(recommendation_run_id=self.RUN_ID)
+
+ generator = trigger.run()
+ response = await generator.asend(None)
+
+ assert response == TriggerEvent({"status": "success",
"recommendation_run_id": self.RUN_ID})
+ assert_expected_waiter_type(mock_get_waiter, self.EXPECTED_WAITER_NAME)
+ mock_get_waiter().wait.assert_called_once()
diff --git a/tests/providers/amazon/aws/waiters/test_glue.py
b/tests/providers/amazon/aws/waiters/test_glue.py
index 3ea119c96e..431082ee21 100644
--- a/tests/providers/amazon/aws/waiters/test_glue.py
+++ b/tests/providers/amazon/aws/waiters/test_glue.py
@@ -24,15 +24,18 @@ import pytest
from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook
from airflow.providers.amazon.aws.sensors.glue import (
+ GlueDataQualityRuleRecommendationRunSensor,
GlueDataQualityRuleSetEvaluationRunSensor,
)
class TestGlueDataQualityCustomWaiters:
- def test_service_waiters(self):
- print(GlueDataQualityHook().list_waiters())
+ def test_evaluation_run_waiters(self):
assert "data_quality_ruleset_evaluation_run_complete" in
GlueDataQualityHook().list_waiters()
+ def test_recommendation_run_waiters(self):
+ assert "data_quality_rule_recommendation_run_complete" in
GlueDataQualityHook().list_waiters()
+
class TestGlueDataQualityCustomWaitersBase:
@pytest.fixture(autouse=True)
@@ -70,3 +73,34 @@ class
TestGlueDataQualityRuleSetEvaluationRunCompleteWaiter(TestGlueDataQualityC
GlueDataQualityHook().get_waiter(self.WAITER_NAME).wait(
RunIc="run_id", WaiterConfig={"Delay": 0.01, "MaxAttempts": 3}
)
+
+
+class
TestGlueDataQualityRuleRecommendationRunCompleteWaiter(TestGlueDataQualityCustomWaitersBase):
+ WAITER_NAME = "data_quality_rule_recommendation_run_complete"
+
+ @pytest.fixture
+ def mock_get_job(self):
+ with mock.patch.object(self.client,
"get_data_quality_rule_recommendation_run") as mock_getter:
+ yield mock_getter
+
+ @pytest.mark.parametrize("state",
GlueDataQualityRuleRecommendationRunSensor.SUCCESS_STATES)
+ def test_data_quality_rule_recommendation_run_complete(self, state,
mock_get_job):
+ mock_get_job.return_value = {"Status": state}
+
+ GlueDataQualityHook().get_waiter(self.WAITER_NAME).wait(RunId="run_id")
+
+ @pytest.mark.parametrize("state",
GlueDataQualityRuleRecommendationRunSensor.FAILURE_STATES)
+ def test_data_quality_rule_recommendation_run_failed(self, state,
mock_get_job):
+ mock_get_job.return_value = {"Status": state}
+
+ with pytest.raises(botocore.exceptions.WaiterError):
+
GlueDataQualityHook().get_waiter(self.WAITER_NAME).wait(RunId="run_id")
+
+ def test_data_quality_rule_recommendation_run_wait(self, mock_get_job):
+ wait = {"Status": "RUNNING"}
+ success = {"Status": "SUCCEEDED"}
+ mock_get_job.side_effect = [wait, wait, success]
+
+ GlueDataQualityHook().get_waiter(self.WAITER_NAME).wait(
+ RunIc="run_id", WaiterConfig={"Delay": 0.01, "MaxAttempts": 3}
+ )
diff --git a/tests/system/providers/amazon/aws/example_glue_data_quality.py
b/tests/system/providers/amazon/aws/example_glue_data_quality.py
index c0de3cda0a..15da75520d 100644
--- a/tests/system/providers/amazon/aws/example_glue_data_quality.py
+++ b/tests/system/providers/amazon/aws/example_glue_data_quality.py
@@ -85,8 +85,8 @@ def glue_data_quality_workflow():
role=test_context[ROLE_ARN_KEY],
rule_set_names=[rule_set_name],
)
- start_evaluation_run.wait_for_completion = False
# [END howto_operator_glue_data_quality_ruleset_evaluation_run_operator]
+ start_evaluation_run.wait_for_completion = False
# [START howto_sensor_glue_data_quality_ruleset_evaluation_run]
await_evaluation_run_sensor = GlueDataQualityRuleSetEvaluationRunSensor(
diff --git a/tests/system/providers/amazon/aws/example_glue_data_quality.py
b/tests/system/providers/amazon/aws/example_glue_data_quality_with_recommendation.py
similarity index 81%
copy from tests/system/providers/amazon/aws/example_glue_data_quality.py
copy to
tests/system/providers/amazon/aws/example_glue_data_quality_with_recommendation.py
index c0de3cda0a..c63d7a56f7 100644
--- a/tests/system/providers/amazon/aws/example_glue_data_quality.py
+++
b/tests/system/providers/amazon/aws/example_glue_data_quality_with_recommendation.py
@@ -24,7 +24,7 @@ from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.providers.amazon.aws.operators.glue import (
- GlueDataQualityOperator,
+ GlueDataQualityRuleRecommendationRunOperator,
GlueDataQualityRuleSetEvaluationRunOperator,
)
from airflow.providers.amazon.aws.operators.s3 import (
@@ -32,48 +32,48 @@ from airflow.providers.amazon.aws.operators.s3 import (
S3CreateObjectOperator,
S3DeleteBucketOperator,
)
-from airflow.providers.amazon.aws.sensors.glue import
GlueDataQualityRuleSetEvaluationRunSensor
+from airflow.providers.amazon.aws.sensors.glue import (
+ GlueDataQualityRuleRecommendationRunSensor,
+ GlueDataQualityRuleSetEvaluationRunSensor,
+)
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder
ROLE_ARN_KEY = "ROLE_ARN"
sys_test_context_task =
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
-DAG_ID = "example_glue_data_quality"
+DAG_ID = "example_glue_data_quality_with_recommendation"
SAMPLE_DATA = """"Alice",20
"Bob",25
"Charlie",30
"""
SAMPLE_FILENAME = "airflow_sample.csv"
-RULE_SET = """
-Rules = [
- RowCount between 2 and 8,
- IsComplete "name",
- Uniqueness "name" > 0.95,
- ColumnLength "name" between 3 and 14,
- ColumnValues "age" between 19 and 31
-]
-"""
-
@task_group
-def glue_data_quality_workflow():
- # [START howto_operator_glue_data_quality_operator]
- create_rule_set = GlueDataQualityOperator(
- task_id="create_rule_set",
- name=rule_set_name,
- ruleset=RULE_SET,
- data_quality_ruleset_kwargs={
- "TargetTable": {
+def glue_data_quality_recommendation_workflow():
+ # [START howto_operator_glue_data_quality_rule_recommendation_run]
+ recommendation_run = GlueDataQualityRuleRecommendationRunOperator(
+ task_id="recommendation_run",
+ datasource={
+ "GlueTable": {
"TableName": athena_table,
"DatabaseName": athena_database,
}
},
+ role=test_context[ROLE_ARN_KEY],
+ recommendation_run_kwargs={"CreatedRulesetName": rule_set_name},
)
- # [END howto_operator_glue_data_quality_operator]
+ # [END howto_operator_glue_data_quality_rule_recommendation_run]
+ recommendation_run.wait_for_completion = False
+
+ # [START howto_sensor_glue_data_quality_rule_recommendation_run]
+ await_recommendation_run_sensor =
GlueDataQualityRuleRecommendationRunSensor(
+ task_id="await_recommendation_run_sensor",
+ recommendation_run_id=recommendation_run.output,
+ )
+ # [END howto_sensor_glue_data_quality_rule_recommendation_run]
- # [START howto_operator_glue_data_quality_ruleset_evaluation_run_operator]
start_evaluation_run = GlueDataQualityRuleSetEvaluationRunOperator(
task_id="start_evaluation_run",
datasource={
@@ -86,16 +86,15 @@ def glue_data_quality_workflow():
rule_set_names=[rule_set_name],
)
start_evaluation_run.wait_for_completion = False
- # [END howto_operator_glue_data_quality_ruleset_evaluation_run_operator]
- # [START howto_sensor_glue_data_quality_ruleset_evaluation_run]
await_evaluation_run_sensor = GlueDataQualityRuleSetEvaluationRunSensor(
task_id="await_evaluation_run_sensor",
evaluation_run_id=start_evaluation_run.output,
)
- # [END howto_sensor_glue_data_quality_ruleset_evaluation_run]
- chain(create_rule_set, start_evaluation_run, await_evaluation_run_sensor)
+ chain(
+ recommendation_run, await_recommendation_run_sensor,
start_evaluation_run, await_evaluation_run_sensor
+ )
@task(trigger_rule=TriggerRule.ALL_DONE)
@@ -114,7 +113,7 @@ with DAG(
test_context = sys_test_context_task()
env_id = test_context["ENV_ID"]
- rule_set_name = f"{env_id}-system-test-ruleset"
+ rule_set_name = f"{env_id}-recommendation-ruleset"
s3_bucket = f"{env_id}-glue-dq-athena-bucket"
athena_table = f"{env_id}_test_glue_dq_table"
athena_database = f"{env_id}_glue_dq_default"
@@ -190,7 +189,7 @@ with DAG(
create_database,
create_table,
# TEST BODY
- glue_data_quality_workflow(),
+ glue_data_quality_recommendation_workflow(),
# TEST TEARDOWN
delete_ruleset(rule_set_name),
drop_table,