This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cc87ae578e feature: AWS - GlueJobOperator - job_poll_interval (#32147)
cc87ae578e is described below
commit cc87ae578ebf930fe5e83bef515e07c327cac268
Author: raphaelauv <[email protected]>
AuthorDate: Mon Jun 26 16:21:23 2023 +0200
feature: AWS - GlueJobOperator - job_poll_interval (#32147)
---
airflow/providers/amazon/aws/hooks/glue.py | 8 ++++----
airflow/providers/amazon/aws/operators/glue.py | 4 ++++
airflow/providers/amazon/aws/triggers/glue.py | 6 +++++-
tests/providers/amazon/aws/hooks/test_glue.py | 12 ++++--------
tests/providers/amazon/aws/triggers/test_glue.py | 4 ++--
5 files changed, 19 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/glue.py
b/airflow/providers/amazon/aws/hooks/glue.py
index e313b95f93..421d11c09e 100644
--- a/airflow/providers/amazon/aws/hooks/glue.py
+++ b/airflow/providers/amazon/aws/hooks/glue.py
@@ -54,8 +54,6 @@ class GlueJobHook(AwsBaseHook):
- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
"""
- JOB_POLL_INTERVAL = 6 # polls job status after every JOB_POLL_INTERVAL
seconds
-
class LogContinuationTokens:
"""Used to hold the continuation tokens when reading logs from both
streams Glue Jobs write to."""
@@ -75,6 +73,7 @@ class GlueJobHook(AwsBaseHook):
iam_role_name: str | None = None,
create_job_kwargs: dict | None = None,
update_config: bool = False,
+ job_poll_interval: int | float = 6,
*args,
**kwargs,
):
@@ -88,6 +87,7 @@ class GlueJobHook(AwsBaseHook):
self.s3_glue_logs = "logs/glue-logs/"
self.create_job_kwargs = create_job_kwargs or {}
self.update_config = update_config
+ self.job_poll_interval = job_poll_interval
worker_type_exists = "WorkerType" in self.create_job_kwargs
num_workers_exists = "NumberOfWorkers" in self.create_job_kwargs
@@ -278,7 +278,7 @@ class GlueJobHook(AwsBaseHook):
if ret:
return ret
else:
- time.sleep(self.JOB_POLL_INTERVAL)
+ time.sleep(self.job_poll_interval)
async def async_job_completion(self, job_name: str, run_id: str, verbose:
bool = False) -> dict[str, str]:
"""
@@ -297,7 +297,7 @@ class GlueJobHook(AwsBaseHook):
if ret:
return ret
else:
- await asyncio.sleep(self.JOB_POLL_INTERVAL)
+ await asyncio.sleep(self.job_poll_interval)
def _handle_state(
self,
diff --git a/airflow/providers/amazon/aws/operators/glue.py
b/airflow/providers/amazon/aws/operators/glue.py
index 37010b6fd8..1d6146e42b 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -99,6 +99,7 @@ class GlueJobOperator(BaseOperator):
deferrable: bool = False,
verbose: bool = False,
update_config: bool = False,
+ job_poll_interval: int | float = 6,
**kwargs,
):
super().__init__(**kwargs)
@@ -121,6 +122,7 @@ class GlueJobOperator(BaseOperator):
self.verbose = verbose
self.update_config = update_config
self.deferrable = deferrable
+ self.job_poll_interval = job_poll_interval
def execute(self, context: Context):
"""Execute AWS Glue Job from Airflow.
@@ -151,6 +153,7 @@ class GlueJobOperator(BaseOperator):
iam_role_name=self.iam_role_name,
create_job_kwargs=self.create_job_kwargs,
update_config=self.update_config,
+ job_poll_interval=self.job_poll_interval,
)
self.log.info(
"Initializing AWS Glue Job: %s. Wait for completion: %s",
@@ -181,6 +184,7 @@ class GlueJobOperator(BaseOperator):
run_id=glue_job_run["JobRunId"],
verbose=self.verbose,
aws_conn_id=self.aws_conn_id,
+ job_poll_interval=self.job_poll_interval,
),
method_name="execute_complete",
)
diff --git a/airflow/providers/amazon/aws/triggers/glue.py
b/airflow/providers/amazon/aws/triggers/glue.py
index 71df2f6cb5..00529330a7 100644
--- a/airflow/providers/amazon/aws/triggers/glue.py
+++ b/airflow/providers/amazon/aws/triggers/glue.py
@@ -39,11 +39,14 @@ class GlueJobCompleteTrigger(BaseTrigger):
run_id: str,
verbose: bool,
aws_conn_id: str,
+ job_poll_interval: int | float,
):
+ super().__init__()
self.job_name = job_name
self.run_id = run_id
self.verbose = verbose
self.aws_conn_id = aws_conn_id
+ self.job_poll_interval = job_poll_interval
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
@@ -54,10 +57,11 @@ class GlueJobCompleteTrigger(BaseTrigger):
"run_id": self.run_id,
"verbose": str(self.verbose),
"aws_conn_id": self.aws_conn_id,
+ "job_poll_interval": self.job_poll_interval,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
- hook = GlueJobHook(aws_conn_id=self.aws_conn_id)
+ hook = GlueJobHook(aws_conn_id=self.aws_conn_id,
job_poll_interval=self.job_poll_interval)
await hook.async_job_completion(self.job_name, self.run_id,
self.verbose)
yield TriggerEvent({"status": "success", "message": "Job done",
"value": self.run_id})
diff --git a/tests/providers/amazon/aws/hooks/test_glue.py
b/tests/providers/amazon/aws/hooks/test_glue.py
index 1497affd86..c41598f3d9 100644
--- a/tests/providers/amazon/aws/hooks/test_glue.py
+++ b/tests/providers/amazon/aws/hooks/test_glue.py
@@ -353,8 +353,7 @@ class TestGlueJobHook:
@mock.patch.object(GlueJobHook, "get_job_state")
def test_job_completion_success(self, get_state_mock: MagicMock):
- hook = GlueJobHook()
- hook.JOB_POLL_INTERVAL = 0
+ hook = GlueJobHook(job_poll_interval=0)
get_state_mock.side_effect = [
"RUNNING",
"RUNNING",
@@ -368,8 +367,7 @@ class TestGlueJobHook:
@mock.patch.object(GlueJobHook, "get_job_state")
def test_job_completion_failure(self, get_state_mock: MagicMock):
- hook = GlueJobHook()
- hook.JOB_POLL_INTERVAL = 0
+ hook = GlueJobHook(job_poll_interval=0)
get_state_mock.side_effect = [
"RUNNING",
"RUNNING",
@@ -384,8 +382,7 @@ class TestGlueJobHook:
@pytest.mark.asyncio
@mock.patch.object(GlueJobHook, "async_get_job_state")
async def test_async_job_completion_success(self, get_state_mock:
MagicMock):
- hook = GlueJobHook()
- hook.JOB_POLL_INTERVAL = 0
+ hook = GlueJobHook(job_poll_interval=0)
get_state_mock.side_effect = [
"RUNNING",
"RUNNING",
@@ -400,8 +397,7 @@ class TestGlueJobHook:
@pytest.mark.asyncio
@mock.patch.object(GlueJobHook, "async_get_job_state")
async def test_async_job_completion_failure(self, get_state_mock:
MagicMock):
- hook = GlueJobHook()
- hook.JOB_POLL_INTERVAL = 0
+ hook = GlueJobHook(job_poll_interval=0)
get_state_mock.side_effect = [
"RUNNING",
"RUNNING",
diff --git a/tests/providers/amazon/aws/triggers/test_glue.py
b/tests/providers/amazon/aws/triggers/test_glue.py
index 014658b950..cc98ecc748 100644
--- a/tests/providers/amazon/aws/triggers/test_glue.py
+++ b/tests/providers/amazon/aws/triggers/test_glue.py
@@ -30,12 +30,12 @@ class TestGlueJobTrigger:
@pytest.mark.asyncio
@mock.patch.object(GlueJobHook, "async_get_job_state")
async def test_wait_job(self, get_state_mock: mock.MagicMock):
- GlueJobHook.JOB_POLL_INTERVAL = 0.1
trigger = GlueJobCompleteTrigger(
job_name="job_name",
run_id="JobRunId",
verbose=False,
aws_conn_id="aws_conn_id",
+ job_poll_interval=0.1,
)
get_state_mock.side_effect = [
"RUNNING",
@@ -52,12 +52,12 @@ class TestGlueJobTrigger:
@pytest.mark.asyncio
@mock.patch.object(GlueJobHook, "async_get_job_state")
async def test_wait_job_failed(self, get_state_mock: mock.MagicMock):
- GlueJobHook.JOB_POLL_INTERVAL = 0.1
trigger = GlueJobCompleteTrigger(
job_name="job_name",
run_id="JobRunId",
verbose=False,
aws_conn_id="aws_conn_id",
+ job_poll_interval=0.1,
)
get_state_mock.side_effect = [
"RUNNING",