uranusjr commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2674290887


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##########
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
         :return: the current Glue job ID.
         """
-        self.log.info(
-            "Initializing AWS Glue Job: %s. Wait for completion: %s",
-            self.job_name,
-            self.wait_for_completion,
-        )
-        glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-        self._job_run_id = glue_job_run["JobRunId"]
+        previous_job_run_id = None
+        if self.resume_glue_job_on_retry:
+            ti = context.get("ti")
+            if ti:
+                previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)
+                if previous_job_run_id:
+                    try:
+                        job_run = 
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+                        state = job_run.get("JobRun", {}).get("JobRunState")
+                        self.log.info("Previous Glue job_run_id: %s, state: 
%s", previous_job_run_id, state)
+                        if state in ("RUNNING", "STARTING", "STOPPING"):
+                            self._job_run_id = previous_job_run_id
+                    except Exception as e:
+                        self.log.warning("Failed to get previous Glue job run 
state: %s", e)
+
+        if not self._job_run_id:
+            self.log.info(
+                "Initializing AWS Glue Job: %s. Wait for completion: %s",
+                self.job_name,
+                self.wait_for_completion,
+            )
+            glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
+            self._job_run_id = glue_job_run["JobRunId"]
+            ti = context.get("ti")
+            if ti:
+                ti.xcom_push(key="glue_job_run_id", value=self._job_run_id)

Review Comment:
   This seems overly defensive to me; `context["ti"]` should always be 
available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to