This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d946d7b013 Fix bug in GlueJobOperator where consecutive runs fail when 
a local script file is used (#38960)
d946d7b013 is described below

commit d946d7b01368aac4697d2cab0a667e0e3c0fa467
Author: Moritz <[email protected]>
AuthorDate: Sat Apr 13 15:09:32 2024 +0200

    Fix bug in GlueJobOperator where consecutive runs fail when a local script 
file is used (#38960)
    
    * add replace=True parameter to s3_hook
    
    * add a parameter to controll if the s3 file should be replaced or not
    
    * linting
    
    * add unit test to cover additional replace_script_file parameter
---
 airflow/providers/amazon/aws/operators/glue.py    |  8 +++++++-
 tests/providers/amazon/aws/operators/test_glue.py | 23 +++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/amazon/aws/operators/glue.py 
b/airflow/providers/amazon/aws/operators/glue.py
index 00da81f96e..e0add3503c 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -64,6 +64,7 @@ class GlueJobOperator(BaseOperator):
         (default: False)
     :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs.  
(default: False)
     :param update_config: If True, Operator will update job configuration.  
(default: False)
+    :param replace_script_file: If True, the script file will be replaced in 
S3. (default: False)
     :param stop_job_run_on_kill: If True, Operator will stop the job run when 
task is killed.
     """
 
@@ -105,6 +106,7 @@ class GlueJobOperator(BaseOperator):
         wait_for_completion: bool = True,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         verbose: bool = False,
+        replace_script_file: bool = False,
         update_config: bool = False,
         job_poll_interval: int | float = 6,
         stop_job_run_on_kill: bool = False,
@@ -130,6 +132,7 @@ class GlueJobOperator(BaseOperator):
         self.wait_for_completion = wait_for_completion
         self.verbose = verbose
         self.update_config = update_config
+        self.replace_script_file = replace_script_file
         self.deferrable = deferrable
         self.job_poll_interval = job_poll_interval
         self.stop_job_run_on_kill = stop_job_run_on_kill
@@ -143,7 +146,10 @@ class GlueJobOperator(BaseOperator):
             s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
             script_name = os.path.basename(self.script_location)
             s3_hook.load_file(
-                self.script_location, self.s3_artifacts_prefix + script_name, 
bucket_name=self.s3_bucket
+                self.script_location,
+                self.s3_artifacts_prefix + script_name,
+                bucket_name=self.s3_bucket,
+                replace=self.replace_script_file,
             )
             s3_script_location = 
f"s3://{self.s3_bucket}/{self.s3_artifacts_prefix}{script_name}"
         else:
diff --git a/tests/providers/amazon/aws/operators/test_glue.py 
b/tests/providers/amazon/aws/operators/test_glue.py
index 272c42e242..e2fc7baf50 100644
--- a/tests/providers/amazon/aws/operators/test_glue.py
+++ b/tests/providers/amazon/aws/operators/test_glue.py
@@ -272,3 +272,26 @@ class TestGlueJobOperator:
             JobName=JOB_NAME,
             JobRunIds=[JOB_RUN_ID],
         )
+
+    @mock.patch.object(GlueJobHook, "get_job_state")
+    @mock.patch.object(GlueJobHook, "initialize_job")
+    @mock.patch.object(GlueJobHook, "get_conn")
+    @mock.patch.object(GlueJobHook, "conn")
+    @mock.patch.object(S3Hook, "load_file")
+    def test_replace_script_file(
+        self, mock_load_file, mock_conn, mock_get_connection, 
mock_initialize_job, mock_get_job_state
+    ):
+        glue = GlueJobOperator(
+            task_id=TASK_ID,
+            job_name=JOB_NAME,
+            script_location="folder/file",
+            s3_bucket="bucket_name",
+            iam_role_name="role_arn",
+            replace_script_file=True,
+        )
+        mock_initialize_job.return_value = {"JobRunState": "RUNNING", 
"JobRunId": JOB_RUN_ID}
+        mock_get_job_state.return_value = "SUCCEEDED"
+        glue.execute(mock.MagicMock())
+        mock_load_file.assert_called_once_with(
+            "folder/file", "artifacts/glue-scripts/file", 
bucket_name="bucket_name", replace=True
+        )

Reply via email to