vincbeck commented on code in PR #40014:
URL: https://github.com/apache/airflow/pull/40014#discussion_r1625038924
##########
airflow/providers/amazon/aws/operators/glue.py:
##########
@@ -499,3 +500,149 @@ def execute_complete(self, context: Context, event:
dict[str, Any] | None = None
)
return event["evaluation_run_id"]
+
+
+class
GlueDataQualityRuleRecommendationRunOperator(AwsBaseOperator[GlueDataQualityHook]):
+ """
+ Starts a recommendation run that is used to generate rules, Glue Data
Quality analyzes the data and comes up with recommendations for a potential
ruleset.
+
+ Recommendation runs are automatically deleted after 90 days.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GlueDataQualityRuleRecommendationRunOperator`
+
+ :param datasource: The data source (Glue table) associated with this run.
(templated)
+ :param role: IAM role supplied for job execution. (templated)
+ :param number_of_workers: The number of G.1X workers to be used in the
run. (default: 5)
+ :param timeout: The timeout for a run in minutes. This is the maximum time
that a run can consume resources
+ before it is terminated and enters TIMEOUT status. (default: 2,880)
+ :param show_results: Displays the recommended ruleset (a set of rules),
when recommendation run completes. (default: True)
+ :param recommendation_run_kwargs: Extra arguments for recommendation run.
(templated)
+ :param wait_for_completion: Whether to wait for job to stop. (default:
True)
+ :param waiter_delay: Time in seconds to wait between status checks.
(default: 60)
+ :param waiter_max_attempts: Maximum number of attempts to check for job
completion. (default: 20)
+ :param deferrable: If True, the operator will wait asynchronously for the
job to stop.
+ This implies waiting for completion. This mode requires aiobotocore
module to be installed.
+ (default: False)
+
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether or not to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+ """
+
+ aws_hook_class = GlueDataQualityHook
+ template_fields: Sequence[str] = (
+ "datasource",
+ "role",
+ "recommendation_run_kwargs",
+ )
+
+ template_fields_renderers = {"datasource": "json",
"recommendation_run_kwargs": "json"}
+
+ ui_color = "#ededed"
+
+ def __init__(
+ self,
+ *,
+ datasource: dict,
+ role: str,
+ number_of_workers: int = 5,
+ timeout: int = 2880,
+ show_results: bool = True,
+ recommendation_run_kwargs: dict[str, Any] | None = None,
+ wait_for_completion: bool = True,
+ waiter_delay: int = 60,
+ waiter_max_attempts: int = 20,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ aws_conn_id: str | None = "aws_default",
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.datasource = datasource
+ self.role = role
+ self.number_of_workers = number_of_workers
+ self.timeout = timeout
+ self.show_results = show_results
+ self.recommendation_run_kwargs = recommendation_run_kwargs or {}
+ self.wait_for_completion = wait_for_completion
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
+ self.deferrable = deferrable
+ self.aws_conn_id = aws_conn_id
+
+ def execute(self, context: Context) -> str:
+ glue_table = self.datasource.get("GlueTable", {})
+
+ if not glue_table.get("DatabaseName") or not
glue_table.get("TableName"):
+ raise AttributeError("DataSource glue table must have DatabaseName
and TableName")
+
+ self.log.info("Submitting AWS Glue data quality recommendation run
with %s", self.datasource)
+
+ try:
+ response =
self.hook.conn.start_data_quality_rule_recommendation_run(
+ DataSource=self.datasource,
+ Role=self.role,
+ NumberOfWorkers=self.number_of_workers,
+ Timeout=self.timeout,
+ **self.recommendation_run_kwargs,
+ )
+ except ClientError as error:
+ print(error)
Review Comment:
Left over? Or if it is intended, we should log instead
##########
tests/system/providers/amazon/aws/example_glue_data_quality_with_recommendation.py:
##########
@@ -0,0 +1,209 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task, task_group
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook
+from airflow.providers.amazon.aws.operators.athena import AthenaOperator
+from airflow.providers.amazon.aws.operators.glue import (
+ GlueDataQualityRuleRecommendationRunOperator,
+ GlueDataQualityRuleSetEvaluationRunOperator,
+)
+from airflow.providers.amazon.aws.operators.s3 import (
+ S3CreateBucketOperator,
+ S3CreateObjectOperator,
+ S3DeleteBucketOperator,
+)
+from airflow.providers.amazon.aws.sensors.glue import (
+ GlueDataQualityRuleRecommendationRunSensor,
+ GlueDataQualityRuleSetEvaluationRunSensor,
+)
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder
+
+ROLE_ARN_KEY = "ROLE_ARN"
+sys_test_context_task =
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
+
+DAG_ID = "example_glue_data_quality_recommendation_run"
Review Comment:
We usually set the DAG ID as the filename
##########
airflow/providers/amazon/aws/operators/glue.py:
##########
@@ -499,3 +500,149 @@ def execute_complete(self, context: Context, event:
dict[str, Any] | None = None
)
return event["evaluation_run_id"]
+
+
+class
GlueDataQualityRuleRecommendationRunOperator(AwsBaseOperator[GlueDataQualityHook]):
+ """
+ Starts a recommendation run that is used to generate rules, Glue Data
Quality analyzes the data and comes up with recommendations for a potential
ruleset.
+
+ Recommendation runs are automatically deleted after 90 days.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GlueDataQualityRuleRecommendationRunOperator`
+
+ :param datasource: The data source (Glue table) associated with this run.
(templated)
+ :param role: IAM role supplied for job execution. (templated)
+ :param number_of_workers: The number of G.1X workers to be used in the
run. (default: 5)
+ :param timeout: The timeout for a run in minutes. This is the maximum time
that a run can consume resources
+ before it is terminated and enters TIMEOUT status. (default: 2,880)
+ :param show_results: Displays the recommended ruleset (a set of rules),
when recommendation run completes. (default: True)
+ :param recommendation_run_kwargs: Extra arguments for recommendation run.
(templated)
+ :param wait_for_completion: Whether to wait for job to stop. (default:
True)
+ :param waiter_delay: Time in seconds to wait between status checks.
(default: 60)
+ :param waiter_max_attempts: Maximum number of attempts to check for job
completion. (default: 20)
+ :param deferrable: If True, the operator will wait asynchronously for the
job to stop.
+ This implies waiting for completion. This mode requires aiobotocore
module to be installed.
+ (default: False)
+
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether or not to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+ """
+
+ aws_hook_class = GlueDataQualityHook
+ template_fields: Sequence[str] = (
+ "datasource",
+ "role",
+ "recommendation_run_kwargs",
+ )
+
+ template_fields_renderers = {"datasource": "json",
"recommendation_run_kwargs": "json"}
+
+ ui_color = "#ededed"
+
+ def __init__(
+ self,
+ *,
+ datasource: dict,
+ role: str,
+ number_of_workers: int = 5,
+ timeout: int = 2880,
+ show_results: bool = True,
+ recommendation_run_kwargs: dict[str, Any] | None = None,
+ wait_for_completion: bool = True,
+ waiter_delay: int = 60,
+ waiter_max_attempts: int = 20,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ aws_conn_id: str | None = "aws_default",
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.datasource = datasource
+ self.role = role
+ self.number_of_workers = number_of_workers
+ self.timeout = timeout
+ self.show_results = show_results
+ self.recommendation_run_kwargs = recommendation_run_kwargs or {}
+ self.wait_for_completion = wait_for_completion
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
+ self.deferrable = deferrable
+ self.aws_conn_id = aws_conn_id
+
+ def execute(self, context: Context) -> str:
+ glue_table = self.datasource.get("GlueTable", {})
+
+ if not glue_table.get("DatabaseName") or not
glue_table.get("TableName"):
+ raise AttributeError("DataSource glue table must have DatabaseName
and TableName")
+
+ self.log.info("Submitting AWS Glue data quality recommendation run
with %s", self.datasource)
+
+ try:
+ response =
self.hook.conn.start_data_quality_rule_recommendation_run(
+ DataSource=self.datasource,
+ Role=self.role,
+ NumberOfWorkers=self.number_of_workers,
+ Timeout=self.timeout,
+ **self.recommendation_run_kwargs,
+ )
+ except ClientError as error:
+ print(error)
+ raise AirflowException(
+ f"AWS Glue data quality recommendation run failed:
{error.response['Error']['Message']}"
+ )
+
+ recommendation_run_id = response["RunId"]
+
+ message_description = (
+ f"AWS Glue data quality recommendation run RunId:
{recommendation_run_id} to complete."
+ )
+ if self.deferrable:
+ self.log.info("Deferring %s", message_description)
+ self.defer(
+ trigger=GlueDataQualityRuleRecommendationRunCompleteTrigger(
+ recommendation_run_id=recommendation_run_id,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ ),
+ method_name="execute_complete",
+ )
+
+ elif self.wait_for_completion:
+ self.log.info("Waiting for %s", message_description)
+
+
self.hook.get_waiter("data_quality_rule_recommendation_run_complete").wait(
+ RunId=recommendation_run_id,
+ WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts":
self.waiter_max_attempts},
+ )
+ self.log.info(
+ "AWS Glue data quality recommendation run completed RunId:
%s", recommendation_run_id
+ )
+
+ if self.show_results:
+
self.hook.log_recommendation_results(run_id=recommendation_run_id)
+
+ else:
+ self.log.info("AWS Glue data quality recommendation run runId:
%s.", recommendation_run_id)
Review Comment:
```suggestion
self.log.info(message_description)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]