shahar1 commented on code in PR #41412:
URL: https://github.com/apache/airflow/pull/41412#discussion_r1717304462


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -107,6 +107,22 @@ def _handle_databricks_operator_execution(operator, hook, 
log, context) -> None:
                         error_message,
                     )
 
+                    repair_reason = next(
+                        (
+                            reason
+                            for reason in 
(operator.databricks_repair_reason_new_settings or {}).keys()
+                            if reason in run_state.state_message
+                        ),
+                        None,
+                    )
+
+                    if repair_reason is not None and 
operator.databricks_repair_reason_new_settings:
+                        new_settings_json = normalise_json_content(
+                            
operator.databricks_repair_reason_new_settings[repair_reason]
+                        )
+                        log.warning("Repairing the run with new_settings json: 
%s", new_settings_json)
+                        hook.update_job(job_id=operator.json["job_id"], 
json=new_settings_json)
+

Review Comment:
   Could you please try adding tests for this part?



##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -872,7 +896,23 @@ def execute_complete(self, context: Context, event: 
dict[str, Any] | None = None
             _handle_deferrable_databricks_operator_completion(event, self.log)
             if event["repair_run"]:
                 self.repair_run = False
+                run_state = RunState.from_json(event["run_state"])
                 self.run_id = event["run_id"]
+                job_id = self._hook.get_job_id(self.run_id)
+                repair_reason = next(
+                    (
+                        reason
+                        for reason in 
(self.databricks_repair_reason_new_settings or {}).keys()
+                        if reason in run_state.state_message
+                    ),
+                    None,
+                )
+                if repair_reason is not None and 
self.databricks_repair_reason_new_settings:
+                    new_settings_json = normalise_json_content(
+                        
self.databricks_repair_reason_new_settings[repair_reason]
+                    )
+                    self._hook.update_job(job_id=str(job_id), 
json=new_settings_json)

Review Comment:
   1. Same comment regarding adding tests.
   2. Looks repetitive - maybe it's worth extracting into a util 
function/static method and reuse it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to