pankajkoti commented on code in PR #39110:
URL: https://github.com/apache/airflow/pull/39110#discussion_r1579838560
##########
airflow/providers/databricks/triggers/databricks.py:
##########
@@ -84,13 +84,30 @@ async def run(self):
async with self.hook:
while True:
run_state = await self.hook.a_get_run_state(self.run_id)
+ notebook_error = None
if run_state.is_terminal:
+ if run_state.result_state == "FAILED":
+ run_info = await self.hook.a_get_run(self.run_id)
+ task_run_id = None
+ if "tasks" in run_info:
+ for task in run_info["tasks"]:
+ if task.get("state", {}).get("result_state",
"") == "FAILED":
+ task_run_id = task["run_id"]
+ if task_run_id is not None:
Review Comment:
I think yes, let's correct that at least in the deferrable mode since we're
adding new code. We can build a map or list as needed to capture error logs for
all the failed tasks since we wish to show the errors in the output.
##########
airflow/providers/databricks/triggers/databricks.py:
##########
@@ -84,21 +84,39 @@ async def run(self):
async with self.hook:
while True:
run_state = await self.hook.a_get_run_state(self.run_id)
+ notebook_error = None
+ if not run_state.is_terminal:
+ self.log.info(
+ "run-id %s in run state %s. sleeping for %s seconds",
+ self.run_id,
+ run_state,
+ self.polling_period_seconds,
+ )
+ await asyncio.sleep(self.polling_period_seconds)
+ continue
+
if run_state.is_terminal:
Review Comment:
We don't need this check as we already know that if the code reaches here
we're in terminal state already.
##########
airflow/providers/databricks/triggers/databricks.py:
##########
@@ -84,13 +84,30 @@ async def run(self):
async with self.hook:
while True:
run_state = await self.hook.a_get_run_state(self.run_id)
+ notebook_error = None
if run_state.is_terminal:
+ if run_state.result_state == "FAILED":
+ run_info = await self.hook.a_get_run(self.run_id)
+ task_run_id = None
+ if "tasks" in run_info:
+ for task in run_info["tasks"]:
+ if task.get("state", {}).get("result_state",
"") == "FAILED":
+ task_run_id = task["run_id"]
+ if task_run_id is not None:
Review Comment:
Also good to check if the task["run_id"] different for all the tasks in a
run or it's the same?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]