This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e0af0b9  AwsGlueJobOperator: add wait_for_completion to Glue job run 
(#18814)
e0af0b9 is described below

commit e0af0b976c0cc43d2b1aa204d047fe755e4c5be7
Author: ron-damon <[email protected]>
AuthorDate: Thu Oct 7 22:37:06 2021 -0300

    AwsGlueJobOperator: add wait_for_completion to Glue job run (#18814)
---
 airflow/providers/amazon/aws/operators/glue.py    | 23 ++++++++++++++++------
 tests/providers/amazon/aws/operators/test_glue.py | 24 +++++++++++++++++++++++
 2 files changed, 41 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/amazon/aws/operators/glue.py 
b/airflow/providers/amazon/aws/operators/glue.py
index 5951177..c10a16b 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -54,6 +54,8 @@ class AwsGlueJobOperator(BaseOperator):
     :type create_job_kwargs: Optional[dict]
     :param run_job_kwargs: Extra arguments for Glue Job Run
     :type run_job_kwargs: Optional[dict]
+    :param wait_for_completion: Whether or not wait for job run completion. 
(default: True)
+    :type wait_for_completion: bool
     """
 
     template_fields = ('script_args',)
@@ -80,6 +82,7 @@ class AwsGlueJobOperator(BaseOperator):
         iam_role_name: Optional[str] = None,
         create_job_kwargs: Optional[dict] = None,
         run_job_kwargs: Optional[dict] = None,
+        wait_for_completion: bool = True,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -98,6 +101,7 @@ class AwsGlueJobOperator(BaseOperator):
         self.s3_artifacts_prefix = 'artifacts/glue-scripts/'
         self.create_job_kwargs = create_job_kwargs
         self.run_job_kwargs = run_job_kwargs or {}
+        self.wait_for_completion = wait_for_completion
 
     def execute(self, context):
         """
@@ -127,13 +131,20 @@ class AwsGlueJobOperator(BaseOperator):
             iam_role_name=self.iam_role_name,
             create_job_kwargs=self.create_job_kwargs,
         )
-        self.log.info("Initializing AWS Glue Job: %s", self.job_name)
-        glue_job_run = glue_job.initialize_job(self.script_args, 
self.run_job_kwargs)
-        glue_job_run = glue_job.job_completion(self.job_name, 
glue_job_run['JobRunId'])
         self.log.info(
-            "AWS Glue Job: %s status: %s. Run Id: %s",
+            "Initializing AWS Glue Job: %s. Wait for completion: %s",
             self.job_name,
-            glue_job_run['JobRunState'],
-            glue_job_run['JobRunId'],
+            self.wait_for_completion,
         )
+        glue_job_run = glue_job.initialize_job(self.script_args, 
self.run_job_kwargs)
+        if self.wait_for_completion:
+            glue_job_run = glue_job.job_completion(self.job_name, 
glue_job_run['JobRunId'])
+            self.log.info(
+                "AWS Glue Job: %s status: %s. Run Id: %s",
+                self.job_name,
+                glue_job_run['JobRunState'],
+                glue_job_run['JobRunId'],
+            )
+        else:
+            self.log.info("AWS Glue Job: %s. Run Id: %s", self.job_name, 
glue_job_run['JobRunId'])
         return glue_job_run['JobRunId']
diff --git a/tests/providers/amazon/aws/operators/test_glue.py 
b/tests/providers/amazon/aws/operators/test_glue.py
index aed0b94..24ffe7c 100644
--- a/tests/providers/amazon/aws/operators/test_glue.py
+++ b/tests/providers/amazon/aws/operators/test_glue.py
@@ -60,3 +60,27 @@ class TestAwsGlueJobOperator(unittest.TestCase):
         glue.execute(None)
         mock_initialize_job.assert_called_once_with({}, {})
         assert glue.job_name == 'my_test_job'
+
+    @mock.patch.object(AwsGlueJobHook, 'job_completion')
+    @mock.patch.object(AwsGlueJobHook, 'initialize_job')
+    @mock.patch.object(AwsGlueJobHook, "get_conn")
+    @mock.patch.object(S3Hook, "load_file")
+    def test_execute_without_waiting_for_completion(
+        self, mock_load_file, mock_get_conn, mock_initialize_job, 
mock_job_completion
+    ):
+        glue = AwsGlueJobOperator(
+            task_id='test_glue_operator',
+            job_name='my_test_job',
+            
script_location='s3://glue-examples/glue-scripts/sample_aws_glue_job.py',
+            aws_conn_id='aws_default',
+            region_name='us-west-2',
+            s3_bucket='some_bucket',
+            iam_role_name='my_test_role',
+            wait_for_completion=False,
+        )
+        mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 
'JobRunId': '11111'}
+        job_run_id = glue.execute(None)
+        mock_initialize_job.assert_called_once_with({}, {})
+        mock_job_completion.assert_not_called()
+        assert glue.job_name == 'my_test_job'
+        assert job_run_id == '11111'

Reply via email to