This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b609ab9001 AWSGlueJobHook updates job configuration if it exists 
(#27893)
b609ab9001 is described below

commit b609ab9001102b67a047b3078dc0b67fbafcc1e1
Author: Romain Ardiet <[email protected]>
AuthorDate: Tue Dec 6 14:29:22 2022 +0000

    AWSGlueJobHook updates job configuration if it exists (#27893)
    
    * AWSGlueJobOperator updates job configuration if it exists
---
 airflow/providers/amazon/aws/hooks/glue.py    | 136 ++++++++++++----------
 tests/providers/amazon/aws/hooks/test_glue.py | 155 +++++++++++++++++++-------
 2 files changed, 195 insertions(+), 96 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/glue.py 
b/airflow/providers/amazon/aws/hooks/glue.py
index f318417412..eac304d750 100644
--- a/airflow/providers/amazon/aws/hooks/glue.py
+++ b/airflow/providers/amazon/aws/hooks/glue.py
@@ -92,10 +92,38 @@ class GlueJobHook(AwsBaseHook):
         kwargs["client_type"] = "glue"
         super().__init__(*args, **kwargs)
 
+    def create_glue_job_config(self) -> dict:
+        if self.s3_bucket is None:
+            raise ValueError("Could not initialize glue job, error: Specify 
Parameter `s3_bucket`")
+
+        default_command = {
+            "Name": "glueetl",
+            "ScriptLocation": self.script_location,
+        }
+        command = self.create_job_kwargs.pop("Command", default_command)
+
+        s3_log_path = 
f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}"
+        execution_role = self.get_iam_execution_role()
+
+        ret_config = {
+            "Name": self.job_name,
+            "Description": self.desc,
+            "LogUri": s3_log_path,
+            "Role": execution_role["Role"]["Arn"],
+            "ExecutionProperty": {"MaxConcurrentRuns": 
self.concurrent_run_limit},
+            "Command": command,
+            "MaxRetries": self.retry_limit,
+            **self.create_job_kwargs,
+        }
+
+        if hasattr(self, "num_of_dpus"):
+            ret_config["MaxCapacity"] = self.num_of_dpus
+
+        return ret_config
+
     def list_jobs(self) -> list:
         """:return: Lists of Jobs"""
-        conn = self.get_conn()
-        return conn.get_jobs()
+        return self.get_conn().get_jobs()
 
     def get_iam_execution_role(self) -> dict:
         """:return: iam role for job execution"""
@@ -120,14 +148,12 @@ class GlueJobHook(AwsBaseHook):
         to run job
         :return:
         """
-        glue_client = self.get_conn()
         script_arguments = script_arguments or {}
         run_kwargs = run_kwargs or {}
 
         try:
-            job_name = self.get_or_create_glue_job()
-            return glue_client.start_job_run(JobName=job_name, 
Arguments=script_arguments, **run_kwargs)
-
+            job_name = self.create_or_update_glue_job()
+            return self.get_conn().start_job_run(JobName=job_name, 
Arguments=script_arguments, **run_kwargs)
         except Exception as general_error:
             self.log.error("Failed to run aws glue job, error: %s", 
general_error)
             raise
@@ -140,8 +166,7 @@ class GlueJobHook(AwsBaseHook):
         :param run_id: The job-run ID of the predecessor job run
         :return: State of the Glue job
         """
-        glue_client = self.get_conn()
-        job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, 
PredecessorsIncluded=True)
+        job_run = self.get_conn().get_job_run(JobName=job_name, RunId=run_id, 
PredecessorsIncluded=True)
         return job_run["JobRun"]["JobRunState"]
 
     def print_job_logs(
@@ -231,54 +256,53 @@ class GlueJobHook(AwsBaseHook):
                         next_token=next_log_token,
                     )
 
-    def get_or_create_glue_job(self) -> str:
+    def has_job(self, job_name) -> bool:
         """
-        Creates(or just returns) and returns the Job name
-        :return:Name of the Job
+        Checks if the job already exists
+
+        :param job_name: unique job name per AWS account
+        :return: Returns True if the job already exists and False if not.
         """
-        glue_client = self.get_conn()
+        self.log.info("Checking if job already exists: %s", job_name)
+
         try:
-            get_job_response = glue_client.get_job(JobName=self.job_name)
-            self.log.info("Job Already exist. Returning Name of the job")
-            return get_job_response["Job"]["Name"]
-
-        except glue_client.exceptions.EntityNotFoundException:
-            self.log.info("Job doesn't exist. Now creating and running AWS 
Glue Job")
-            if self.s3_bucket is None:
-                raise AirflowException("Could not initialize glue job, error: 
Specify Parameter `s3_bucket`")
-            s3_log_path = 
f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}"
-            execution_role = self.get_iam_execution_role()
-            try:
-                default_command = {
-                    "Name": "glueetl",
-                    "ScriptLocation": self.script_location,
-                }
-                command = self.create_job_kwargs.pop("Command", 
default_command)
-
-                if "WorkerType" in self.create_job_kwargs and 
"NumberOfWorkers" in self.create_job_kwargs:
-                    create_job_response = glue_client.create_job(
-                        Name=self.job_name,
-                        Description=self.desc,
-                        LogUri=s3_log_path,
-                        Role=execution_role["Role"]["Arn"],
-                        ExecutionProperty={"MaxConcurrentRuns": 
self.concurrent_run_limit},
-                        Command=command,
-                        MaxRetries=self.retry_limit,
-                        **self.create_job_kwargs,
-                    )
-                else:
-                    create_job_response = glue_client.create_job(
-                        Name=self.job_name,
-                        Description=self.desc,
-                        LogUri=s3_log_path,
-                        Role=execution_role["Role"]["Arn"],
-                        ExecutionProperty={"MaxConcurrentRuns": 
self.concurrent_run_limit},
-                        Command=command,
-                        MaxRetries=self.retry_limit,
-                        MaxCapacity=self.num_of_dpus,
-                        **self.create_job_kwargs,
-                    )
-                return create_job_response["Name"]
-            except Exception as general_error:
-                self.log.error("Failed to create aws glue job, error: %s", 
general_error)
-                raise
+            self.get_conn().get_job(JobName=job_name)
+            return True
+        except self.get_conn().exceptions.EntityNotFoundException:
+            return False
+
+    def update_job(self, **job_kwargs) -> bool:
+        """
+        Updates job configurations
+
+        :param job_kwargs: Keyword args that define the configurations used 
for the job
+        :return: True if job was updated and false otherwise
+        """
+        job_name = job_kwargs.pop("Name")
+        current_job = self.get_conn().get_job(JobName=job_name)["Job"]
+
+        update_config = {
+            key: value for key, value in job_kwargs.items() if 
current_job.get(key) != job_kwargs[key]
+        }
+        if update_config != {}:
+            self.log.info("Updating job: %s", job_name)
+            self.get_conn().update_job(JobName=job_name, JobUpdate=job_kwargs)
+            self.log.info("Updated configurations: %s", update_config)
+            return True
+        else:
+            return False
+
+    def create_or_update_glue_job(self) -> str | None:
+        """
+        Creates(or updates) and returns the Job name
+        :return:Name of the Job
+        """
+        config = self.create_glue_job_config()
+
+        if self.has_job(self.job_name):
+            self.update_job(**config)
+        else:
+            self.log.info("Creating job: %s", self.job_name)
+            self.get_conn().create_job(**config)
+
+        return self.job_name
diff --git a/tests/providers/amazon/aws/hooks/test_glue.py 
b/tests/providers/amazon/aws/hooks/test_glue.py
index 18e252a039..4c64728dc8 100644
--- a/tests/providers/amazon/aws/hooks/test_glue.py
+++ b/tests/providers/amazon/aws/hooks/test_glue.py
@@ -24,6 +24,7 @@ import boto3
 import pytest
 from moto import mock_glue, mock_iam
 
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
 
 
@@ -62,75 +63,149 @@ class TestGlueJobHook:
         assert "Arn" in iam_role["Role"]
         assert iam_role["Role"]["Arn"] == 
f"arn:aws:iam::123456789012:role{role_path}{expected_role}"
 
+    @mock.patch.object(AwsBaseHook, "get_conn")
+    def test_has_job_exists(self, mock_get_conn):
+        job_name = "aws_test_glue_job"
+        mock_get_conn.return_value.get_job.return_value = {"Job": {"Name": 
job_name}}
+
+        hook = GlueJobHook(aws_conn_id=None, job_name=job_name, 
s3_bucket="some_bucket")
+        result = hook.has_job(job_name)
+        assert result is True
+        
mock_get_conn.return_value.get_job.assert_called_once_with(JobName=hook.job_name)
+
+    @mock.patch.object(AwsBaseHook, "get_conn")
+    def test_has_job_job_doesnt_exists(self, mock_get_conn):
+        class JobNotFoundException(Exception):
+            pass
+
+        mock_get_conn.return_value.exceptions.EntityNotFoundException = 
JobNotFoundException
+        mock_get_conn.return_value.get_job.side_effect = JobNotFoundException()
+
+        job_name = "aws_test_glue_job"
+        hook = GlueJobHook(aws_conn_id=None, job_name=job_name, 
s3_bucket="some_bucket")
+        result = hook.has_job(job_name)
+        assert result is False
+        
mock_get_conn.return_value.get_job.assert_called_once_with(JobName=job_name)
+
+    @mock.patch.object(GlueJobHook, "get_iam_execution_role")
     @mock.patch.object(GlueJobHook, "get_conn")
-    def test_get_or_create_glue_job_get_existing_job(self, mock_get_conn):
+    def test_create_or_update_glue_job_create_new_job(self, mock_get_conn, 
mock_get_iam_execution_role):
         """
-        Calls 'get_or_create_glue_job' with a existing job.
-        Should retrieve existing one.
+        Calls 'create_or_update_glue_job' with no existing job.
+        Should create a new job.
         """
-        expected_job_name = "simple-job"
-        mock_get_conn.return_value.get_job.return_value = {"Job": {"Name": 
expected_job_name}}
 
-        some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py"
-        some_s3_bucket = "my-includes"
+        class JobNotFoundException(Exception):
+            pass
+
+        expected_job_name = "aws_test_glue_job"
+        job_description = "This is test case job from Airflow"
+        role_name = "my_test_role"
+        role_name_arn = "test_role"
+        some_s3_bucket = "bucket"
+
+        mock_get_conn.return_value.exceptions.EntityNotFoundException = 
JobNotFoundException
+        mock_get_conn.return_value.get_job.side_effect = JobNotFoundException()
+        mock_get_iam_execution_role.return_value = {"Role": {"RoleName": 
role_name, "Arn": role_name_arn}}
 
         hook = GlueJobHook(
-            job_name="aws_test_glue_job",
-            desc="This is test case job from Airflow",
-            script_location=some_script,
-            iam_role_name="my_test_role",
             s3_bucket=some_s3_bucket,
+            job_name=expected_job_name,
+            desc=job_description,
+            concurrent_run_limit=2,
+            retry_limit=3,
+            num_of_dpus=5,
+            iam_role_name=role_name,
+            create_job_kwargs={"Command": {}},
             region_name=self.some_aws_region,
         )
 
-        result = hook.get_or_create_glue_job()
-
-        mock_get_conn.assert_called_once()
-        
mock_get_conn.return_value.get_job.assert_called_once_with(JobName=hook.job_name)
+        result = hook.create_or_update_glue_job()
+
+        
mock_get_conn.return_value.get_job.assert_called_once_with(JobName=expected_job_name)
+        mock_get_conn.return_value.create_job.assert_called_once_with(
+            Command={},
+            Description=job_description,
+            ExecutionProperty={"MaxConcurrentRuns": 2},
+            LogUri=f"s3://{some_s3_bucket}/logs/glue-logs/{expected_job_name}",
+            MaxCapacity=5,
+            MaxRetries=3,
+            Name=expected_job_name,
+            Role=role_name_arn,
+        )
+        mock_get_conn.return_value.update_job.assert_not_called()
         assert result == expected_job_name
 
-    @mock_glue
     @mock.patch.object(GlueJobHook, "get_iam_execution_role")
-    def test_get_or_create_glue_job_create_new_job(self, 
mock_get_iam_execution_role):
+    @mock.patch.object(GlueJobHook, "get_conn")
+    def test_create_or_update_glue_job_update_existing_job(self, 
mock_get_conn, mock_get_iam_execution_role):
         """
-        Calls 'get_or_create_glue_job' with no existing job.
-        Should create a new job.
+        Calls 'create_or_update_glue_job' with a existing job.
+        Should update existing job configurations.
         """
-        mock_get_iam_execution_role.return_value = {"Role": {"RoleName": 
"my_test_role", "Arn": "test_role"}}
-        expected_job_name = "aws_test_glue_job"
+        job_name = "aws_test_glue_job"
+        job_description = "This is test case job from Airflow"
+        role_name = "my_test_role"
+        role_name_arn = "test_role"
+        some_script = "s3://glue-examples/glue-scripts/sample_aws_glue_job.py"
+        some_s3_bucket = "my-includes"
+
+        mock_get_conn.return_value.get_job.return_value = {
+            "Job": {
+                "Name": job_name,
+                "Description": "Old description of job",
+                "Role": role_name_arn,
+            }
+        }
+        mock_get_iam_execution_role.return_value = {"Role": {"RoleName": 
role_name, "Arn": "test_role"}}
 
         hook = GlueJobHook(
-            job_name=expected_job_name,
-            desc="This is test case job from Airflow",
-            iam_role_name="my_test_role",
-            script_location="s3://bucket",
-            s3_bucket="bucket",
+            job_name=job_name,
+            desc=job_description,
+            script_location=some_script,
+            iam_role_name=role_name,
+            s3_bucket=some_s3_bucket,
             region_name=self.some_aws_region,
-            create_job_kwargs={"Command": {}},
         )
 
-        result = hook.get_or_create_glue_job()
-
-        assert result == expected_job_name
+        result = hook.create_or_update_glue_job()
+
+        assert mock_get_conn.return_value.get_job.call_count == 2
+        mock_get_conn.return_value.update_job.assert_called_once_with(
+            JobName=job_name,
+            JobUpdate={
+                "Description": job_description,
+                "LogUri": f"s3://{some_s3_bucket}/logs/glue-logs/{job_name}",
+                "Role": role_name_arn,
+                "ExecutionProperty": {"MaxConcurrentRuns": 1},
+                "Command": {"Name": "glueetl", "ScriptLocation": some_script},
+                "MaxRetries": 0,
+                "MaxCapacity": 10,
+            },
+        )
+        assert result == job_name
 
+    @mock_glue
     @mock.patch.object(GlueJobHook, "get_iam_execution_role")
-    @mock.patch.object(GlueJobHook, "get_conn")
-    def test_get_or_create_glue_job_worker_type(self, mock_get_conn, 
mock_get_iam_execution_role):
-        mock_get_iam_execution_role.return_value = 
mock.MagicMock(Role={"RoleName": "my_test_role"})
+    def test_create_or_update_glue_job_worker_type(self, 
mock_get_iam_execution_role):
+        mock_get_iam_execution_role.return_value = {"Role": {"RoleName": 
"my_test_role", "Arn": "test_role"}}
         some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py"
         some_s3_bucket = "my-includes"
+        expected_job_name = "aws_test_glue_job_worker_type"
 
-        mock_glue_job = mock_get_conn.return_value.get_job()["Job"]["Name"]
         glue_job = GlueJobHook(
-            job_name="aws_test_glue_job",
+            job_name=expected_job_name,
             desc="This is test case job from Airflow",
             script_location=some_script,
             iam_role_name="my_test_role",
             s3_bucket=some_s3_bucket,
             region_name=self.some_aws_region,
             create_job_kwargs={"WorkerType": "G.2X", "NumberOfWorkers": 60},
-        ).get_or_create_glue_job()
-        assert glue_job == mock_glue_job
+        )
+
+        result = glue_job.create_or_update_glue_job()
+
+        assert result == expected_job_name
 
     @mock.patch.object(GlueJobHook, "get_iam_execution_role")
     @mock.patch.object(GlueJobHook, "get_conn")
@@ -152,16 +227,16 @@ class TestGlueJobHook:
             )
 
     @mock.patch.object(GlueJobHook, "get_job_state")
-    @mock.patch.object(GlueJobHook, "get_or_create_glue_job")
+    @mock.patch.object(GlueJobHook, "create_or_update_glue_job")
     @mock.patch.object(GlueJobHook, "get_conn")
-    def test_initialize_job(self, mock_get_conn, mock_get_or_create_glue_job, 
mock_get_job_state):
+    def test_initialize_job(self, mock_get_conn, 
mock_create_or_update_glue_job, mock_get_job_state):
         some_data_path = "s3://glue-datasets/examples/medicare/SampleData.csv"
         some_script_arguments = {"--s3_input_data_path": some_data_path}
         some_run_kwargs = {"NumberOfWorkers": 5}
         some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py"
         some_s3_bucket = "my-includes"
 
-        mock_get_or_create_glue_job.Name = mock.Mock(Name="aws_test_glue_job")
+        mock_create_or_update_glue_job.Name = 
mock.Mock(Name="aws_test_glue_job")
         mock_get_conn.return_value.start_job_run()
 
         mock_job_run_state = mock_get_job_state.return_value

Reply via email to