vincbeck commented on code in PR #27893:
URL: https://github.com/apache/airflow/pull/27893#discussion_r1038519007


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -92,10 +92,38 @@ def __init__(
         kwargs["client_type"] = "glue"
         super().__init__(*args, **kwargs)
 
+    def create_glue_job_config(self) -> dict:
+        if self.s3_bucket is None:
+            raise AirflowException("Could not initialize glue job, error: 
Specify Parameter `s3_bucket`")
+
+        default_command = {
+            "Name": "glueetl",

Review Comment:
   Should this name provided by the user and not hardcoded?



##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -92,10 +92,38 @@ def __init__(
         kwargs["client_type"] = "glue"
         super().__init__(*args, **kwargs)
 
+    def create_glue_job_config(self) -> dict:
+        if self.s3_bucket is None:
+            raise AirflowException("Could not initialize glue job, error: 
Specify Parameter `s3_bucket`")

Review Comment:
   `TypeError` would make more sense?



##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -92,10 +92,38 @@ def __init__(
         kwargs["client_type"] = "glue"
         super().__init__(*args, **kwargs)
 
+    def create_glue_job_config(self) -> dict:
+        if self.s3_bucket is None:
+            raise AirflowException("Could not initialize glue job, error: 
Specify Parameter `s3_bucket`")
+
+        default_command = {
+            "Name": "glueetl",
+            "ScriptLocation": self.script_location,
+        }
+        command = self.create_job_kwargs.pop("Command", default_command)
+
+        s3_log_path = 
f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}"
+        execution_role = self.get_iam_execution_role()
+
+        ret_config = {
+            "Name": self.job_name,
+            "Description": self.desc,
+            "LogUri": s3_log_path,
+            "Role": execution_role["Role"]["Arn"],

Review Comment:
   It is a pretty strong assumption to assume that the user wants the execution 
role as role no? Should not it be specified by the user?



##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -231,54 +256,72 @@ def job_completion(self, job_name: str, run_id: str, 
verbose: bool = False) -> d
                         next_token=next_log_token,
                     )
 
-    def get_or_create_glue_job(self) -> str:
+    def get_job(self, job_name) -> dict:

Review Comment:
   As a convention we usually try to avoid having methods in hooks which just 
are just wrapper around boto3 APIs. To me, there is no much value having 
`get_job` function in the hook since you can use directly `hook.conn.get_job`



##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -231,54 +256,72 @@ def job_completion(self, job_name: str, run_id: str, 
verbose: bool = False) -> d
                         next_token=next_log_token,
                     )
 
-    def get_or_create_glue_job(self) -> str:
+    def get_job(self, job_name) -> dict:
         """
-        Creates(or just returns) and returns the Job name
-        :return:Name of the Job
+        Gets job configurations
+
+        :param job_name: unique job name per AWS account
+        :return: Nested dictionary of job configurations
         """
-        glue_client = self.get_conn()
+        return self.get_conn().get_job(JobName=job_name)["Job"]
+
+    def has_job(self, job_name) -> bool:
+        """
+        Checks if the job already exists
+
+        :param job_name: unique job name per AWS account
+        :return: Returns True if the job already exists and False if not.
+        """
+        self.log.info("Checking if job already exists: %s", job_name)
+
         try:
-            get_job_response = glue_client.get_job(JobName=self.job_name)
-            self.log.info("Job Already exist. Returning Name of the job")
-            return get_job_response["Job"]["Name"]
-
-        except glue_client.exceptions.EntityNotFoundException:
-            self.log.info("Job doesn't exist. Now creating and running AWS 
Glue Job")
-            if self.s3_bucket is None:
-                raise AirflowException("Could not initialize glue job, error: 
Specify Parameter `s3_bucket`")
-            s3_log_path = 
f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}"
-            execution_role = self.get_iam_execution_role()
-            try:
-                default_command = {
-                    "Name": "glueetl",
-                    "ScriptLocation": self.script_location,
-                }
-                command = self.create_job_kwargs.pop("Command", 
default_command)
-
-                if "WorkerType" in self.create_job_kwargs and 
"NumberOfWorkers" in self.create_job_kwargs:
-                    create_job_response = glue_client.create_job(
-                        Name=self.job_name,
-                        Description=self.desc,
-                        LogUri=s3_log_path,
-                        Role=execution_role["Role"]["Arn"],
-                        ExecutionProperty={"MaxConcurrentRuns": 
self.concurrent_run_limit},
-                        Command=command,
-                        MaxRetries=self.retry_limit,
-                        **self.create_job_kwargs,
-                    )
-                else:
-                    create_job_response = glue_client.create_job(
-                        Name=self.job_name,
-                        Description=self.desc,
-                        LogUri=s3_log_path,
-                        Role=execution_role["Role"]["Arn"],
-                        ExecutionProperty={"MaxConcurrentRuns": 
self.concurrent_run_limit},
-                        Command=command,
-                        MaxRetries=self.retry_limit,
-                        MaxCapacity=self.num_of_dpus,
-                        **self.create_job_kwargs,
-                    )
-                return create_job_response["Name"]
-            except Exception as general_error:
-                self.log.error("Failed to create aws glue job, error: %s", 
general_error)
-                raise
+            self.get_job(job_name)
+            return True
+        except self.get_conn().exceptions.EntityNotFoundException:
+            return False
+
+    def update_job(self, **job_kwargs) -> bool:
+        """
+        Updates job configurations
+
+        :param job_kwargs: Keyword args that define the configurations used 
for the job
+        :return: True if job was updated and false otherwise
+        """
+        job_name = job_kwargs.pop("Name")
+        current_job = self.get_job(job_name)
+
+        update_config = {
+            key: value for key, value in job_kwargs.items() if 
current_job.get(key) != job_kwargs[key]
+        }
+        if update_config != {}:
+            self.log.info("Updating job: %s", job_name)
+            self.get_conn().update_job(JobName=job_name, JobUpdate=job_kwargs)
+            self.log.info("Updated configurations: %s", update_config)
+            return True
+        else:
+            return False
+
+    def create_job(self, **job_kwargs) -> str:

Review Comment:
   Same comment here, this function is just a wrapper around boto3 `create_job` 
API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to