gopidesupavan commented on code in PR #40014:
URL: https://github.com/apache/airflow/pull/40014#discussion_r1625113446


##########
airflow/providers/amazon/aws/operators/glue.py:
##########
@@ -499,3 +500,149 @@ def execute_complete(self, context: Context, event: 
dict[str, Any] | None = None
         )
 
         return event["evaluation_run_id"]
+
+
+class 
GlueDataQualityRuleRecommendationRunOperator(AwsBaseOperator[GlueDataQualityHook]):
+    """
+    Starts a recommendation run that is used to generate rules, Glue Data 
Quality analyzes the data and comes up with recommendations for a potential 
ruleset.
+
+    Recommendation runs are automatically deleted after 90 days.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GlueDataQualityRuleRecommendationRunOperator`
+
+    :param datasource: The data source (Glue table) associated with this run. 
(templated)
+    :param role: IAM role supplied for job execution. (templated)
+    :param number_of_workers: The number of G.1X workers to be used in the 
run. (default: 5)
+    :param timeout: The timeout for a run in minutes. This is the maximum time 
that a run can consume resources
+        before it is terminated and enters TIMEOUT status. (default: 2,880)
+    :param show_results: Displays the recommended ruleset (a set of rules), 
when recommendation run completes. (default: True)
+    :param recommendation_run_kwargs: Extra arguments for recommendation run. 
(templated)
+    :param wait_for_completion: Whether to wait for job to stop. (default: 
True)
+    :param waiter_delay: Time in seconds to wait between status checks. 
(default: 60)
+    :param waiter_max_attempts: Maximum number of attempts to check for job 
completion. (default: 20)
+    :param deferrable: If True, the operator will wait asynchronously for the 
job to stop.
+        This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
+        (default: False)
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param verify: Whether or not to verify SSL certificates. See:
+        
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+    :param botocore_config: Configuration dictionary (key-values) for botocore 
client. See:
+        
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+    """
+
+    aws_hook_class = GlueDataQualityHook
+    template_fields: Sequence[str] = (
+        "datasource",
+        "role",
+        "recommendation_run_kwargs",
+    )
+
+    template_fields_renderers = {"datasource": "json", 
"recommendation_run_kwargs": "json"}
+
+    ui_color = "#ededed"
+
+    def __init__(
+        self,
+        *,
+        datasource: dict,
+        role: str,
+        number_of_workers: int = 5,
+        timeout: int = 2880,
+        show_results: bool = True,
+        recommendation_run_kwargs: dict[str, Any] | None = None,
+        wait_for_completion: bool = True,
+        waiter_delay: int = 60,
+        waiter_max_attempts: int = 20,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.datasource = datasource
+        self.role = role
+        self.number_of_workers = number_of_workers
+        self.timeout = timeout
+        self.show_results = show_results
+        self.recommendation_run_kwargs = recommendation_run_kwargs or {}
+        self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+        self.deferrable = deferrable
+        self.aws_conn_id = aws_conn_id
+
+    def execute(self, context: Context) -> str:
+        glue_table = self.datasource.get("GlueTable", {})
+
+        if not glue_table.get("DatabaseName") or not 
glue_table.get("TableName"):
+            raise AttributeError("DataSource glue table must have DatabaseName 
and TableName")
+
+        self.log.info("Submitting AWS Glue data quality recommendation run 
with %s", self.datasource)
+
+        try:
+            response = 
self.hook.conn.start_data_quality_rule_recommendation_run(
+                DataSource=self.datasource,
+                Role=self.role,
+                NumberOfWorkers=self.number_of_workers,
+                Timeout=self.timeout,
+                **self.recommendation_run_kwargs,
+            )
+        except ClientError as error:
+            print(error)

Review Comment:
   Good catch, Sorry my bad i left it there. i have removed now. thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to