gopidesupavan commented on code in PR #40014:
URL: https://github.com/apache/airflow/pull/40014#discussion_r1625113446
##########
airflow/providers/amazon/aws/operators/glue.py:
##########
@@ -499,3 +500,149 @@ def execute_complete(self, context: Context, event:
dict[str, Any] | None = None
)
return event["evaluation_run_id"]
+
+
+class
GlueDataQualityRuleRecommendationRunOperator(AwsBaseOperator[GlueDataQualityHook]):
+ """
+ Starts a recommendation run that is used to generate rules, Glue Data
Quality analyzes the data and comes up with recommendations for a potential
ruleset.
+
+ Recommendation runs are automatically deleted after 90 days.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GlueDataQualityRuleRecommendationRunOperator`
+
+ :param datasource: The data source (Glue table) associated with this run.
(templated)
+ :param role: IAM role supplied for job execution. (templated)
+ :param number_of_workers: The number of G.1X workers to be used in the
run. (default: 5)
+ :param timeout: The timeout for a run in minutes. This is the maximum time
that a run can consume resources
+ before it is terminated and enters TIMEOUT status. (default: 2,880)
+ :param show_results: Displays the recommended ruleset (a set of rules),
when recommendation run completes. (default: True)
+ :param recommendation_run_kwargs: Extra arguments for recommendation run.
(templated)
+ :param wait_for_completion: Whether to wait for job to stop. (default:
True)
+ :param waiter_delay: Time in seconds to wait between status checks.
(default: 60)
+ :param waiter_max_attempts: Maximum number of attempts to check for job
completion. (default: 20)
+ :param deferrable: If True, the operator will wait asynchronously for the
job to stop.
+ This implies waiting for completion. This mode requires aiobotocore
module to be installed.
+ (default: False)
+
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether or not to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+ """
+
+ aws_hook_class = GlueDataQualityHook
+ template_fields: Sequence[str] = (
+ "datasource",
+ "role",
+ "recommendation_run_kwargs",
+ )
+
+ template_fields_renderers = {"datasource": "json",
"recommendation_run_kwargs": "json"}
+
+ ui_color = "#ededed"
+
+ def __init__(
+ self,
+ *,
+ datasource: dict,
+ role: str,
+ number_of_workers: int = 5,
+ timeout: int = 2880,
+ show_results: bool = True,
+ recommendation_run_kwargs: dict[str, Any] | None = None,
+ wait_for_completion: bool = True,
+ waiter_delay: int = 60,
+ waiter_max_attempts: int = 20,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ aws_conn_id: str | None = "aws_default",
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.datasource = datasource
+ self.role = role
+ self.number_of_workers = number_of_workers
+ self.timeout = timeout
+ self.show_results = show_results
+ self.recommendation_run_kwargs = recommendation_run_kwargs or {}
+ self.wait_for_completion = wait_for_completion
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
+ self.deferrable = deferrable
+ self.aws_conn_id = aws_conn_id
+
+ def execute(self, context: Context) -> str:
+ glue_table = self.datasource.get("GlueTable", {})
+
+ if not glue_table.get("DatabaseName") or not
glue_table.get("TableName"):
+ raise AttributeError("DataSource glue table must have DatabaseName
and TableName")
+
+ self.log.info("Submitting AWS Glue data quality recommendation run
with %s", self.datasource)
+
+ try:
+ response =
self.hook.conn.start_data_quality_rule_recommendation_run(
+ DataSource=self.datasource,
+ Role=self.role,
+ NumberOfWorkers=self.number_of_workers,
+ Timeout=self.timeout,
+ **self.recommendation_run_kwargs,
+ )
+ except ClientError as error:
+ print(error)
Review Comment:
Good catch, Sorry my bad i left it there. i have removed now. thank you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]