eladkal commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1014591186
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -221,6 +220,16 @@ def execute(self, context):
return output
+ def on_kill(self) -> None:
+ self.log.info("Stopping:", self._hook.query_ids)
+
+ results = []
+ for query_id in self._hook.query_ids.copy():
+ result = self._hook.stop_query(query_id)
+ results.append(result)
+
+ self.log.info("Termination successful:", results)
Review Comment:
not necessarily.
query may have already completed successfully in the time between so we
don't really know if we succeeded in termination we know only that we requested
termination.
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -221,6 +220,16 @@ def execute(self, context):
return output
+ def on_kill(self) -> None:
+ self.log.info("Stopping:", self._hook.query_ids)
Review Comment:
here we don't know if we will stop the query as we don't know if the hook
implemented the needed function
##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
if cur.rowcount >= 0:
self.log.info("Rows affected: %s", cur.rowcount)
+ def _update_query_ids(self, cursor) -> None:
+ """
+ Adds query ids to list
+ :param cur: current cursor after run
+ :return:
+ """
+ return None
+
+ def stop_query(self, query_id) -> Any:
+ """
+ Stops query with certain identifier
+ :param query_id: identifier of the query
+ :return:
+ """
+ return None
Review Comment:
I know we debated this on slack but we should also get some resolution
here...
should we return None or `NotImplementedError`?
we can catch the NotImplementedError in the operator and convert it to INFO
log entry in the log notifying users that their request to kill the query can
not be processed due to lack of implementation.
##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
if cur.rowcount >= 0:
self.log.info("Rows affected: %s", cur.rowcount)
+ def _update_query_ids(self, cursor) -> None:
+ """
+ Adds query ids to list
+ :param cur: current cursor after run
+ :return:
+ """
+ return None
+
+ def stop_query(self, query_id) -> Any:
Review Comment:
there is some confusing terminology. if we use the term `kill` lets stick
with it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]