This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e0af0b9 AwsGlueJobOperator: add wait_for_completion to Glue job run
(#18814)
e0af0b9 is described below
commit e0af0b976c0cc43d2b1aa204d047fe755e4c5be7
Author: ron-damon <[email protected]>
AuthorDate: Thu Oct 7 22:37:06 2021 -0300
AwsGlueJobOperator: add wait_for_completion to Glue job run (#18814)
---
airflow/providers/amazon/aws/operators/glue.py | 23 ++++++++++++++++------
tests/providers/amazon/aws/operators/test_glue.py | 24 +++++++++++++++++++++++
2 files changed, 41 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/amazon/aws/operators/glue.py
b/airflow/providers/amazon/aws/operators/glue.py
index 5951177..c10a16b 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -54,6 +54,8 @@ class AwsGlueJobOperator(BaseOperator):
:type create_job_kwargs: Optional[dict]
:param run_job_kwargs: Extra arguments for Glue Job Run
:type run_job_kwargs: Optional[dict]
+ :param wait_for_completion: Whether or not wait for job run completion.
(default: True)
+ :type wait_for_completion: bool
"""
template_fields = ('script_args',)
@@ -80,6 +82,7 @@ class AwsGlueJobOperator(BaseOperator):
iam_role_name: Optional[str] = None,
create_job_kwargs: Optional[dict] = None,
run_job_kwargs: Optional[dict] = None,
+ wait_for_completion: bool = True,
**kwargs,
):
super().__init__(**kwargs)
@@ -98,6 +101,7 @@ class AwsGlueJobOperator(BaseOperator):
self.s3_artifacts_prefix = 'artifacts/glue-scripts/'
self.create_job_kwargs = create_job_kwargs
self.run_job_kwargs = run_job_kwargs or {}
+ self.wait_for_completion = wait_for_completion
def execute(self, context):
"""
@@ -127,13 +131,20 @@ class AwsGlueJobOperator(BaseOperator):
iam_role_name=self.iam_role_name,
create_job_kwargs=self.create_job_kwargs,
)
- self.log.info("Initializing AWS Glue Job: %s", self.job_name)
- glue_job_run = glue_job.initialize_job(self.script_args,
self.run_job_kwargs)
- glue_job_run = glue_job.job_completion(self.job_name,
glue_job_run['JobRunId'])
self.log.info(
- "AWS Glue Job: %s status: %s. Run Id: %s",
+ "Initializing AWS Glue Job: %s. Wait for completion: %s",
self.job_name,
- glue_job_run['JobRunState'],
- glue_job_run['JobRunId'],
+ self.wait_for_completion,
)
+ glue_job_run = glue_job.initialize_job(self.script_args,
self.run_job_kwargs)
+ if self.wait_for_completion:
+ glue_job_run = glue_job.job_completion(self.job_name,
glue_job_run['JobRunId'])
+ self.log.info(
+ "AWS Glue Job: %s status: %s. Run Id: %s",
+ self.job_name,
+ glue_job_run['JobRunState'],
+ glue_job_run['JobRunId'],
+ )
+ else:
+ self.log.info("AWS Glue Job: %s. Run Id: %s", self.job_name,
glue_job_run['JobRunId'])
return glue_job_run['JobRunId']
diff --git a/tests/providers/amazon/aws/operators/test_glue.py
b/tests/providers/amazon/aws/operators/test_glue.py
index aed0b94..24ffe7c 100644
--- a/tests/providers/amazon/aws/operators/test_glue.py
+++ b/tests/providers/amazon/aws/operators/test_glue.py
@@ -60,3 +60,27 @@ class TestAwsGlueJobOperator(unittest.TestCase):
glue.execute(None)
mock_initialize_job.assert_called_once_with({}, {})
assert glue.job_name == 'my_test_job'
+
+ @mock.patch.object(AwsGlueJobHook, 'job_completion')
+ @mock.patch.object(AwsGlueJobHook, 'initialize_job')
+ @mock.patch.object(AwsGlueJobHook, "get_conn")
+ @mock.patch.object(S3Hook, "load_file")
+ def test_execute_without_waiting_for_completion(
+ self, mock_load_file, mock_get_conn, mock_initialize_job,
mock_job_completion
+ ):
+ glue = AwsGlueJobOperator(
+ task_id='test_glue_operator',
+ job_name='my_test_job',
+
script_location='s3://glue-examples/glue-scripts/sample_aws_glue_job.py',
+ aws_conn_id='aws_default',
+ region_name='us-west-2',
+ s3_bucket='some_bucket',
+ iam_role_name='my_test_role',
+ wait_for_completion=False,
+ )
+ mock_initialize_job.return_value = {'JobRunState': 'RUNNING',
'JobRunId': '11111'}
+ job_run_id = glue.execute(None)
+ mock_initialize_job.assert_called_once_with({}, {})
+ mock_job_completion.assert_not_called()
+ assert glue.job_name == 'my_test_job'
+ assert job_run_id == '11111'