This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d946d7b013 Fix bug in GlueJobOperator where consecutive runs fail when
a local script file is used (#38960)
d946d7b013 is described below
commit d946d7b01368aac4697d2cab0a667e0e3c0fa467
Author: Moritz <[email protected]>
AuthorDate: Sat Apr 13 15:09:32 2024 +0200
Fix bug in GlueJobOperator where consecutive runs fail when a local script
file is used (#38960)
* add replace=True parameter to s3_hook
* add a parameter to controll if the s3 file should be replaced or not
* linting
* add unit test to cover additional replace_script_file parameter
---
airflow/providers/amazon/aws/operators/glue.py | 8 +++++++-
tests/providers/amazon/aws/operators/test_glue.py | 23 +++++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/amazon/aws/operators/glue.py
b/airflow/providers/amazon/aws/operators/glue.py
index 00da81f96e..e0add3503c 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -64,6 +64,7 @@ class GlueJobOperator(BaseOperator):
(default: False)
:param verbose: If True, Glue Job Run logs show in the Airflow Task Logs.
(default: False)
:param update_config: If True, Operator will update job configuration.
(default: False)
+ :param replace_script_file: If True, the script file will be replaced in
S3. (default: False)
:param stop_job_run_on_kill: If True, Operator will stop the job run when
task is killed.
"""
@@ -105,6 +106,7 @@ class GlueJobOperator(BaseOperator):
wait_for_completion: bool = True,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
verbose: bool = False,
+ replace_script_file: bool = False,
update_config: bool = False,
job_poll_interval: int | float = 6,
stop_job_run_on_kill: bool = False,
@@ -130,6 +132,7 @@ class GlueJobOperator(BaseOperator):
self.wait_for_completion = wait_for_completion
self.verbose = verbose
self.update_config = update_config
+ self.replace_script_file = replace_script_file
self.deferrable = deferrable
self.job_poll_interval = job_poll_interval
self.stop_job_run_on_kill = stop_job_run_on_kill
@@ -143,7 +146,10 @@ class GlueJobOperator(BaseOperator):
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
script_name = os.path.basename(self.script_location)
s3_hook.load_file(
- self.script_location, self.s3_artifacts_prefix + script_name,
bucket_name=self.s3_bucket
+ self.script_location,
+ self.s3_artifacts_prefix + script_name,
+ bucket_name=self.s3_bucket,
+ replace=self.replace_script_file,
)
s3_script_location =
f"s3://{self.s3_bucket}/{self.s3_artifacts_prefix}{script_name}"
else:
diff --git a/tests/providers/amazon/aws/operators/test_glue.py
b/tests/providers/amazon/aws/operators/test_glue.py
index 272c42e242..e2fc7baf50 100644
--- a/tests/providers/amazon/aws/operators/test_glue.py
+++ b/tests/providers/amazon/aws/operators/test_glue.py
@@ -272,3 +272,26 @@ class TestGlueJobOperator:
JobName=JOB_NAME,
JobRunIds=[JOB_RUN_ID],
)
+
+ @mock.patch.object(GlueJobHook, "get_job_state")
+ @mock.patch.object(GlueJobHook, "initialize_job")
+ @mock.patch.object(GlueJobHook, "get_conn")
+ @mock.patch.object(GlueJobHook, "conn")
+ @mock.patch.object(S3Hook, "load_file")
+ def test_replace_script_file(
+ self, mock_load_file, mock_conn, mock_get_connection,
mock_initialize_job, mock_get_job_state
+ ):
+ glue = GlueJobOperator(
+ task_id=TASK_ID,
+ job_name=JOB_NAME,
+ script_location="folder/file",
+ s3_bucket="bucket_name",
+ iam_role_name="role_arn",
+ replace_script_file=True,
+ )
+ mock_initialize_job.return_value = {"JobRunState": "RUNNING",
"JobRunId": JOB_RUN_ID}
+ mock_get_job_state.return_value = "SUCCEEDED"
+ glue.execute(mock.MagicMock())
+ mock_load_file.assert_called_once_with(
+ "folder/file", "artifacts/glue-scripts/file",
bucket_name="bucket_name", replace=True
+ )