This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cc87ae578e feature: AWS - GlueJobOperator - job_poll_interval (#32147)
cc87ae578e is described below

commit cc87ae578ebf930fe5e83bef515e07c327cac268
Author: raphaelauv <[email protected]>
AuthorDate: Mon Jun 26 16:21:23 2023 +0200

    feature: AWS - GlueJobOperator - job_poll_interval (#32147)
---
 airflow/providers/amazon/aws/hooks/glue.py       |  8 ++++----
 airflow/providers/amazon/aws/operators/glue.py   |  4 ++++
 airflow/providers/amazon/aws/triggers/glue.py    |  6 +++++-
 tests/providers/amazon/aws/hooks/test_glue.py    | 12 ++++--------
 tests/providers/amazon/aws/triggers/test_glue.py |  4 ++--
 5 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/glue.py 
b/airflow/providers/amazon/aws/hooks/glue.py
index e313b95f93..421d11c09e 100644
--- a/airflow/providers/amazon/aws/hooks/glue.py
+++ b/airflow/providers/amazon/aws/hooks/glue.py
@@ -54,8 +54,6 @@ class GlueJobHook(AwsBaseHook):
         - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
     """
 
-    JOB_POLL_INTERVAL = 6  # polls job status after every JOB_POLL_INTERVAL 
seconds
-
     class LogContinuationTokens:
         """Used to hold the continuation tokens when reading logs from both 
streams Glue Jobs write to."""
 
@@ -75,6 +73,7 @@ class GlueJobHook(AwsBaseHook):
         iam_role_name: str | None = None,
         create_job_kwargs: dict | None = None,
         update_config: bool = False,
+        job_poll_interval: int | float = 6,
         *args,
         **kwargs,
     ):
@@ -88,6 +87,7 @@ class GlueJobHook(AwsBaseHook):
         self.s3_glue_logs = "logs/glue-logs/"
         self.create_job_kwargs = create_job_kwargs or {}
         self.update_config = update_config
+        self.job_poll_interval = job_poll_interval
 
         worker_type_exists = "WorkerType" in self.create_job_kwargs
         num_workers_exists = "NumberOfWorkers" in self.create_job_kwargs
@@ -278,7 +278,7 @@ class GlueJobHook(AwsBaseHook):
             if ret:
                 return ret
             else:
-                time.sleep(self.JOB_POLL_INTERVAL)
+                time.sleep(self.job_poll_interval)
 
     async def async_job_completion(self, job_name: str, run_id: str, verbose: 
bool = False) -> dict[str, str]:
         """
@@ -297,7 +297,7 @@ class GlueJobHook(AwsBaseHook):
             if ret:
                 return ret
             else:
-                await asyncio.sleep(self.JOB_POLL_INTERVAL)
+                await asyncio.sleep(self.job_poll_interval)
 
     def _handle_state(
         self,
diff --git a/airflow/providers/amazon/aws/operators/glue.py 
b/airflow/providers/amazon/aws/operators/glue.py
index 37010b6fd8..1d6146e42b 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -99,6 +99,7 @@ class GlueJobOperator(BaseOperator):
         deferrable: bool = False,
         verbose: bool = False,
         update_config: bool = False,
+        job_poll_interval: int | float = 6,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -121,6 +122,7 @@ class GlueJobOperator(BaseOperator):
         self.verbose = verbose
         self.update_config = update_config
         self.deferrable = deferrable
+        self.job_poll_interval = job_poll_interval
 
     def execute(self, context: Context):
         """Execute AWS Glue Job from Airflow.
@@ -151,6 +153,7 @@ class GlueJobOperator(BaseOperator):
             iam_role_name=self.iam_role_name,
             create_job_kwargs=self.create_job_kwargs,
             update_config=self.update_config,
+            job_poll_interval=self.job_poll_interval,
         )
         self.log.info(
             "Initializing AWS Glue Job: %s. Wait for completion: %s",
@@ -181,6 +184,7 @@ class GlueJobOperator(BaseOperator):
                     run_id=glue_job_run["JobRunId"],
                     verbose=self.verbose,
                     aws_conn_id=self.aws_conn_id,
+                    job_poll_interval=self.job_poll_interval,
                 ),
                 method_name="execute_complete",
             )
diff --git a/airflow/providers/amazon/aws/triggers/glue.py 
b/airflow/providers/amazon/aws/triggers/glue.py
index 71df2f6cb5..00529330a7 100644
--- a/airflow/providers/amazon/aws/triggers/glue.py
+++ b/airflow/providers/amazon/aws/triggers/glue.py
@@ -39,11 +39,14 @@ class GlueJobCompleteTrigger(BaseTrigger):
         run_id: str,
         verbose: bool,
         aws_conn_id: str,
+        job_poll_interval: int | float,
     ):
+        super().__init__()
         self.job_name = job_name
         self.run_id = run_id
         self.verbose = verbose
         self.aws_conn_id = aws_conn_id
+        self.job_poll_interval = job_poll_interval
 
     def serialize(self) -> tuple[str, dict[str, Any]]:
         return (
@@ -54,10 +57,11 @@ class GlueJobCompleteTrigger(BaseTrigger):
                 "run_id": self.run_id,
                 "verbose": str(self.verbose),
                 "aws_conn_id": self.aws_conn_id,
+                "job_poll_interval": self.job_poll_interval,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        hook = GlueJobHook(aws_conn_id=self.aws_conn_id)
+        hook = GlueJobHook(aws_conn_id=self.aws_conn_id, 
job_poll_interval=self.job_poll_interval)
         await hook.async_job_completion(self.job_name, self.run_id, 
self.verbose)
         yield TriggerEvent({"status": "success", "message": "Job done", 
"value": self.run_id})
diff --git a/tests/providers/amazon/aws/hooks/test_glue.py 
b/tests/providers/amazon/aws/hooks/test_glue.py
index 1497affd86..c41598f3d9 100644
--- a/tests/providers/amazon/aws/hooks/test_glue.py
+++ b/tests/providers/amazon/aws/hooks/test_glue.py
@@ -353,8 +353,7 @@ class TestGlueJobHook:
 
     @mock.patch.object(GlueJobHook, "get_job_state")
     def test_job_completion_success(self, get_state_mock: MagicMock):
-        hook = GlueJobHook()
-        hook.JOB_POLL_INTERVAL = 0
+        hook = GlueJobHook(job_poll_interval=0)
         get_state_mock.side_effect = [
             "RUNNING",
             "RUNNING",
@@ -368,8 +367,7 @@ class TestGlueJobHook:
 
     @mock.patch.object(GlueJobHook, "get_job_state")
     def test_job_completion_failure(self, get_state_mock: MagicMock):
-        hook = GlueJobHook()
-        hook.JOB_POLL_INTERVAL = 0
+        hook = GlueJobHook(job_poll_interval=0)
         get_state_mock.side_effect = [
             "RUNNING",
             "RUNNING",
@@ -384,8 +382,7 @@ class TestGlueJobHook:
     @pytest.mark.asyncio
     @mock.patch.object(GlueJobHook, "async_get_job_state")
     async def test_async_job_completion_success(self, get_state_mock: 
MagicMock):
-        hook = GlueJobHook()
-        hook.JOB_POLL_INTERVAL = 0
+        hook = GlueJobHook(job_poll_interval=0)
         get_state_mock.side_effect = [
             "RUNNING",
             "RUNNING",
@@ -400,8 +397,7 @@ class TestGlueJobHook:
     @pytest.mark.asyncio
     @mock.patch.object(GlueJobHook, "async_get_job_state")
     async def test_async_job_completion_failure(self, get_state_mock: 
MagicMock):
-        hook = GlueJobHook()
-        hook.JOB_POLL_INTERVAL = 0
+        hook = GlueJobHook(job_poll_interval=0)
         get_state_mock.side_effect = [
             "RUNNING",
             "RUNNING",
diff --git a/tests/providers/amazon/aws/triggers/test_glue.py 
b/tests/providers/amazon/aws/triggers/test_glue.py
index 014658b950..cc98ecc748 100644
--- a/tests/providers/amazon/aws/triggers/test_glue.py
+++ b/tests/providers/amazon/aws/triggers/test_glue.py
@@ -30,12 +30,12 @@ class TestGlueJobTrigger:
     @pytest.mark.asyncio
     @mock.patch.object(GlueJobHook, "async_get_job_state")
     async def test_wait_job(self, get_state_mock: mock.MagicMock):
-        GlueJobHook.JOB_POLL_INTERVAL = 0.1
         trigger = GlueJobCompleteTrigger(
             job_name="job_name",
             run_id="JobRunId",
             verbose=False,
             aws_conn_id="aws_conn_id",
+            job_poll_interval=0.1,
         )
         get_state_mock.side_effect = [
             "RUNNING",
@@ -52,12 +52,12 @@ class TestGlueJobTrigger:
     @pytest.mark.asyncio
     @mock.patch.object(GlueJobHook, "async_get_job_state")
     async def test_wait_job_failed(self, get_state_mock: mock.MagicMock):
-        GlueJobHook.JOB_POLL_INTERVAL = 0.1
         trigger = GlueJobCompleteTrigger(
             job_name="job_name",
             run_id="JobRunId",
             verbose=False,
             aws_conn_id="aws_conn_id",
+            job_poll_interval=0.1,
         )
         get_state_mock.side_effect = [
             "RUNNING",

Reply via email to