This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b609ab9001 AWSGlueJobHook updates job configuration if it exists
(#27893)
b609ab9001 is described below
commit b609ab9001102b67a047b3078dc0b67fbafcc1e1
Author: Romain Ardiet <[email protected]>
AuthorDate: Tue Dec 6 14:29:22 2022 +0000
AWSGlueJobHook updates job configuration if it exists (#27893)
* AWSGlueJobOperator updates job configuration if it exists
---
airflow/providers/amazon/aws/hooks/glue.py | 136 ++++++++++++----------
tests/providers/amazon/aws/hooks/test_glue.py | 155 +++++++++++++++++++-------
2 files changed, 195 insertions(+), 96 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/glue.py
b/airflow/providers/amazon/aws/hooks/glue.py
index f318417412..eac304d750 100644
--- a/airflow/providers/amazon/aws/hooks/glue.py
+++ b/airflow/providers/amazon/aws/hooks/glue.py
@@ -92,10 +92,38 @@ class GlueJobHook(AwsBaseHook):
kwargs["client_type"] = "glue"
super().__init__(*args, **kwargs)
+ def create_glue_job_config(self) -> dict:
+ if self.s3_bucket is None:
+ raise ValueError("Could not initialize glue job, error: Specify
Parameter `s3_bucket`")
+
+ default_command = {
+ "Name": "glueetl",
+ "ScriptLocation": self.script_location,
+ }
+ command = self.create_job_kwargs.pop("Command", default_command)
+
+ s3_log_path =
f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}"
+ execution_role = self.get_iam_execution_role()
+
+ ret_config = {
+ "Name": self.job_name,
+ "Description": self.desc,
+ "LogUri": s3_log_path,
+ "Role": execution_role["Role"]["Arn"],
+ "ExecutionProperty": {"MaxConcurrentRuns":
self.concurrent_run_limit},
+ "Command": command,
+ "MaxRetries": self.retry_limit,
+ **self.create_job_kwargs,
+ }
+
+ if hasattr(self, "num_of_dpus"):
+ ret_config["MaxCapacity"] = self.num_of_dpus
+
+ return ret_config
+
def list_jobs(self) -> list:
""":return: Lists of Jobs"""
- conn = self.get_conn()
- return conn.get_jobs()
+ return self.get_conn().get_jobs()
def get_iam_execution_role(self) -> dict:
""":return: iam role for job execution"""
@@ -120,14 +148,12 @@ class GlueJobHook(AwsBaseHook):
to run job
:return:
"""
- glue_client = self.get_conn()
script_arguments = script_arguments or {}
run_kwargs = run_kwargs or {}
try:
- job_name = self.get_or_create_glue_job()
- return glue_client.start_job_run(JobName=job_name,
Arguments=script_arguments, **run_kwargs)
-
+ job_name = self.create_or_update_glue_job()
+ return self.get_conn().start_job_run(JobName=job_name,
Arguments=script_arguments, **run_kwargs)
except Exception as general_error:
self.log.error("Failed to run aws glue job, error: %s",
general_error)
raise
@@ -140,8 +166,7 @@ class GlueJobHook(AwsBaseHook):
:param run_id: The job-run ID of the predecessor job run
:return: State of the Glue job
"""
- glue_client = self.get_conn()
- job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id,
PredecessorsIncluded=True)
+ job_run = self.get_conn().get_job_run(JobName=job_name, RunId=run_id,
PredecessorsIncluded=True)
return job_run["JobRun"]["JobRunState"]
def print_job_logs(
@@ -231,54 +256,53 @@ class GlueJobHook(AwsBaseHook):
next_token=next_log_token,
)
- def get_or_create_glue_job(self) -> str:
+ def has_job(self, job_name) -> bool:
"""
- Creates(or just returns) and returns the Job name
- :return:Name of the Job
+ Checks if the job already exists
+
+ :param job_name: unique job name per AWS account
+ :return: Returns True if the job already exists and False if not.
"""
- glue_client = self.get_conn()
+ self.log.info("Checking if job already exists: %s", job_name)
+
try:
- get_job_response = glue_client.get_job(JobName=self.job_name)
- self.log.info("Job Already exist. Returning Name of the job")
- return get_job_response["Job"]["Name"]
-
- except glue_client.exceptions.EntityNotFoundException:
- self.log.info("Job doesn't exist. Now creating and running AWS
Glue Job")
- if self.s3_bucket is None:
- raise AirflowException("Could not initialize glue job, error:
Specify Parameter `s3_bucket`")
- s3_log_path =
f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}"
- execution_role = self.get_iam_execution_role()
- try:
- default_command = {
- "Name": "glueetl",
- "ScriptLocation": self.script_location,
- }
- command = self.create_job_kwargs.pop("Command",
default_command)
-
- if "WorkerType" in self.create_job_kwargs and
"NumberOfWorkers" in self.create_job_kwargs:
- create_job_response = glue_client.create_job(
- Name=self.job_name,
- Description=self.desc,
- LogUri=s3_log_path,
- Role=execution_role["Role"]["Arn"],
- ExecutionProperty={"MaxConcurrentRuns":
self.concurrent_run_limit},
- Command=command,
- MaxRetries=self.retry_limit,
- **self.create_job_kwargs,
- )
- else:
- create_job_response = glue_client.create_job(
- Name=self.job_name,
- Description=self.desc,
- LogUri=s3_log_path,
- Role=execution_role["Role"]["Arn"],
- ExecutionProperty={"MaxConcurrentRuns":
self.concurrent_run_limit},
- Command=command,
- MaxRetries=self.retry_limit,
- MaxCapacity=self.num_of_dpus,
- **self.create_job_kwargs,
- )
- return create_job_response["Name"]
- except Exception as general_error:
- self.log.error("Failed to create aws glue job, error: %s",
general_error)
- raise
+ self.get_conn().get_job(JobName=job_name)
+ return True
+ except self.get_conn().exceptions.EntityNotFoundException:
+ return False
+
+ def update_job(self, **job_kwargs) -> bool:
+ """
+ Updates job configurations
+
+ :param job_kwargs: Keyword args that define the configurations used
for the job
+ :return: True if job was updated and false otherwise
+ """
+ job_name = job_kwargs.pop("Name")
+ current_job = self.get_conn().get_job(JobName=job_name)["Job"]
+
+ update_config = {
+ key: value for key, value in job_kwargs.items() if
current_job.get(key) != job_kwargs[key]
+ }
+ if update_config != {}:
+ self.log.info("Updating job: %s", job_name)
+ self.get_conn().update_job(JobName=job_name, JobUpdate=job_kwargs)
+ self.log.info("Updated configurations: %s", update_config)
+ return True
+ else:
+ return False
+
+ def create_or_update_glue_job(self) -> str | None:
+ """
+ Creates(or updates) and returns the Job name
+ :return:Name of the Job
+ """
+ config = self.create_glue_job_config()
+
+ if self.has_job(self.job_name):
+ self.update_job(**config)
+ else:
+ self.log.info("Creating job: %s", self.job_name)
+ self.get_conn().create_job(**config)
+
+ return self.job_name
diff --git a/tests/providers/amazon/aws/hooks/test_glue.py
b/tests/providers/amazon/aws/hooks/test_glue.py
index 18e252a039..4c64728dc8 100644
--- a/tests/providers/amazon/aws/hooks/test_glue.py
+++ b/tests/providers/amazon/aws/hooks/test_glue.py
@@ -24,6 +24,7 @@ import boto3
import pytest
from moto import mock_glue, mock_iam
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
@@ -62,75 +63,149 @@ class TestGlueJobHook:
assert "Arn" in iam_role["Role"]
assert iam_role["Role"]["Arn"] ==
f"arn:aws:iam::123456789012:role{role_path}{expected_role}"
+ @mock.patch.object(AwsBaseHook, "get_conn")
+ def test_has_job_exists(self, mock_get_conn):
+ job_name = "aws_test_glue_job"
+ mock_get_conn.return_value.get_job.return_value = {"Job": {"Name":
job_name}}
+
+ hook = GlueJobHook(aws_conn_id=None, job_name=job_name,
s3_bucket="some_bucket")
+ result = hook.has_job(job_name)
+ assert result is True
+
mock_get_conn.return_value.get_job.assert_called_once_with(JobName=hook.job_name)
+
+ @mock.patch.object(AwsBaseHook, "get_conn")
+ def test_has_job_job_doesnt_exists(self, mock_get_conn):
+ class JobNotFoundException(Exception):
+ pass
+
+ mock_get_conn.return_value.exceptions.EntityNotFoundException =
JobNotFoundException
+ mock_get_conn.return_value.get_job.side_effect = JobNotFoundException()
+
+ job_name = "aws_test_glue_job"
+ hook = GlueJobHook(aws_conn_id=None, job_name=job_name,
s3_bucket="some_bucket")
+ result = hook.has_job(job_name)
+ assert result is False
+
mock_get_conn.return_value.get_job.assert_called_once_with(JobName=job_name)
+
+ @mock.patch.object(GlueJobHook, "get_iam_execution_role")
@mock.patch.object(GlueJobHook, "get_conn")
- def test_get_or_create_glue_job_get_existing_job(self, mock_get_conn):
+ def test_create_or_update_glue_job_create_new_job(self, mock_get_conn,
mock_get_iam_execution_role):
"""
- Calls 'get_or_create_glue_job' with a existing job.
- Should retrieve existing one.
+ Calls 'create_or_update_glue_job' with no existing job.
+ Should create a new job.
"""
- expected_job_name = "simple-job"
- mock_get_conn.return_value.get_job.return_value = {"Job": {"Name":
expected_job_name}}
- some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py"
- some_s3_bucket = "my-includes"
+ class JobNotFoundException(Exception):
+ pass
+
+ expected_job_name = "aws_test_glue_job"
+ job_description = "This is test case job from Airflow"
+ role_name = "my_test_role"
+ role_name_arn = "test_role"
+ some_s3_bucket = "bucket"
+
+ mock_get_conn.return_value.exceptions.EntityNotFoundException =
JobNotFoundException
+ mock_get_conn.return_value.get_job.side_effect = JobNotFoundException()
+ mock_get_iam_execution_role.return_value = {"Role": {"RoleName":
role_name, "Arn": role_name_arn}}
hook = GlueJobHook(
- job_name="aws_test_glue_job",
- desc="This is test case job from Airflow",
- script_location=some_script,
- iam_role_name="my_test_role",
s3_bucket=some_s3_bucket,
+ job_name=expected_job_name,
+ desc=job_description,
+ concurrent_run_limit=2,
+ retry_limit=3,
+ num_of_dpus=5,
+ iam_role_name=role_name,
+ create_job_kwargs={"Command": {}},
region_name=self.some_aws_region,
)
- result = hook.get_or_create_glue_job()
-
- mock_get_conn.assert_called_once()
-
mock_get_conn.return_value.get_job.assert_called_once_with(JobName=hook.job_name)
+ result = hook.create_or_update_glue_job()
+
+
mock_get_conn.return_value.get_job.assert_called_once_with(JobName=expected_job_name)
+ mock_get_conn.return_value.create_job.assert_called_once_with(
+ Command={},
+ Description=job_description,
+ ExecutionProperty={"MaxConcurrentRuns": 2},
+ LogUri=f"s3://{some_s3_bucket}/logs/glue-logs/{expected_job_name}",
+ MaxCapacity=5,
+ MaxRetries=3,
+ Name=expected_job_name,
+ Role=role_name_arn,
+ )
+ mock_get_conn.return_value.update_job.assert_not_called()
assert result == expected_job_name
- @mock_glue
@mock.patch.object(GlueJobHook, "get_iam_execution_role")
- def test_get_or_create_glue_job_create_new_job(self,
mock_get_iam_execution_role):
+ @mock.patch.object(GlueJobHook, "get_conn")
+ def test_create_or_update_glue_job_update_existing_job(self,
mock_get_conn, mock_get_iam_execution_role):
"""
- Calls 'get_or_create_glue_job' with no existing job.
- Should create a new job.
+ Calls 'create_or_update_glue_job' with a existing job.
+ Should update existing job configurations.
"""
- mock_get_iam_execution_role.return_value = {"Role": {"RoleName":
"my_test_role", "Arn": "test_role"}}
- expected_job_name = "aws_test_glue_job"
+ job_name = "aws_test_glue_job"
+ job_description = "This is test case job from Airflow"
+ role_name = "my_test_role"
+ role_name_arn = "test_role"
+ some_script = "s3://glue-examples/glue-scripts/sample_aws_glue_job.py"
+ some_s3_bucket = "my-includes"
+
+ mock_get_conn.return_value.get_job.return_value = {
+ "Job": {
+ "Name": job_name,
+ "Description": "Old description of job",
+ "Role": role_name_arn,
+ }
+ }
+ mock_get_iam_execution_role.return_value = {"Role": {"RoleName":
role_name, "Arn": "test_role"}}
hook = GlueJobHook(
- job_name=expected_job_name,
- desc="This is test case job from Airflow",
- iam_role_name="my_test_role",
- script_location="s3://bucket",
- s3_bucket="bucket",
+ job_name=job_name,
+ desc=job_description,
+ script_location=some_script,
+ iam_role_name=role_name,
+ s3_bucket=some_s3_bucket,
region_name=self.some_aws_region,
- create_job_kwargs={"Command": {}},
)
- result = hook.get_or_create_glue_job()
-
- assert result == expected_job_name
+ result = hook.create_or_update_glue_job()
+
+ assert mock_get_conn.return_value.get_job.call_count == 2
+ mock_get_conn.return_value.update_job.assert_called_once_with(
+ JobName=job_name,
+ JobUpdate={
+ "Description": job_description,
+ "LogUri": f"s3://{some_s3_bucket}/logs/glue-logs/{job_name}",
+ "Role": role_name_arn,
+ "ExecutionProperty": {"MaxConcurrentRuns": 1},
+ "Command": {"Name": "glueetl", "ScriptLocation": some_script},
+ "MaxRetries": 0,
+ "MaxCapacity": 10,
+ },
+ )
+ assert result == job_name
+ @mock_glue
@mock.patch.object(GlueJobHook, "get_iam_execution_role")
- @mock.patch.object(GlueJobHook, "get_conn")
- def test_get_or_create_glue_job_worker_type(self, mock_get_conn,
mock_get_iam_execution_role):
- mock_get_iam_execution_role.return_value =
mock.MagicMock(Role={"RoleName": "my_test_role"})
+ def test_create_or_update_glue_job_worker_type(self,
mock_get_iam_execution_role):
+ mock_get_iam_execution_role.return_value = {"Role": {"RoleName":
"my_test_role", "Arn": "test_role"}}
some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py"
some_s3_bucket = "my-includes"
+ expected_job_name = "aws_test_glue_job_worker_type"
- mock_glue_job = mock_get_conn.return_value.get_job()["Job"]["Name"]
glue_job = GlueJobHook(
- job_name="aws_test_glue_job",
+ job_name=expected_job_name,
desc="This is test case job from Airflow",
script_location=some_script,
iam_role_name="my_test_role",
s3_bucket=some_s3_bucket,
region_name=self.some_aws_region,
create_job_kwargs={"WorkerType": "G.2X", "NumberOfWorkers": 60},
- ).get_or_create_glue_job()
- assert glue_job == mock_glue_job
+ )
+
+ result = glue_job.create_or_update_glue_job()
+
+ assert result == expected_job_name
@mock.patch.object(GlueJobHook, "get_iam_execution_role")
@mock.patch.object(GlueJobHook, "get_conn")
@@ -152,16 +227,16 @@ class TestGlueJobHook:
)
@mock.patch.object(GlueJobHook, "get_job_state")
- @mock.patch.object(GlueJobHook, "get_or_create_glue_job")
+ @mock.patch.object(GlueJobHook, "create_or_update_glue_job")
@mock.patch.object(GlueJobHook, "get_conn")
- def test_initialize_job(self, mock_get_conn, mock_get_or_create_glue_job,
mock_get_job_state):
+ def test_initialize_job(self, mock_get_conn,
mock_create_or_update_glue_job, mock_get_job_state):
some_data_path = "s3://glue-datasets/examples/medicare/SampleData.csv"
some_script_arguments = {"--s3_input_data_path": some_data_path}
some_run_kwargs = {"NumberOfWorkers": 5}
some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py"
some_s3_bucket = "my-includes"
- mock_get_or_create_glue_job.Name = mock.Mock(Name="aws_test_glue_job")
+ mock_create_or_update_glue_job.Name =
mock.Mock(Name="aws_test_glue_job")
mock_get_conn.return_value.start_job_run()
mock_job_run_state = mock_get_job_state.return_value