This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9510043546 Fix #28391 manual task trigger from UI fails for k8s
executor (#28394)
9510043546 is described below
commit 9510043546d1ac8ac56b67bafa537e4b940d68a4
Author: sanjayp <[email protected]>
AuthorDate: Tue Jan 24 09:18:45 2023 -0600
Fix #28391 manual task trigger from UI fails for k8s executor (#28394)
Manual task trigger from UI fails for k8s executor. the executor.job_id
is currently set to "manual". the task instance queued_by_job_id field
is expected to be None|Integer. this causes the filter query in
clear_not_launched_queued_tasks method in kubernetes_executor to fail
with psycopg2.errors.InvalidTextRepresentation invalid input syntax for
integer: "manual" error.
setting the job_id to None fixes the issue.
---
airflow/cli/commands/task_command.py | 2 +-
airflow/executors/kubernetes_executor.py | 2 --
airflow/www/views.py | 2 +-
3 files changed, 2 insertions(+), 4 deletions(-)
diff --git a/airflow/cli/commands/task_command.py
b/airflow/cli/commands/task_command.py
index 0f3f5a8bba..9ab96e2faa 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -221,7 +221,7 @@ def _run_task_by_executor(args, dag, ti):
print(e)
raise e
executor = ExecutorLoader.get_default_executor()
- executor.job_id = "manual"
+ executor.job_id = None
executor.start()
print("Sending to executor.")
executor.queue_task_instance(
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index 040ca21856..e1d7b06a98 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -569,8 +569,6 @@ class KubernetesExecutor(BaseExecutor):
def start(self) -> None:
"""Starts the executor."""
self.log.info("Start Kubernetes executor")
- if not self.job_id:
- raise AirflowException("Could not get scheduler_job_id")
self.scheduler_job_id = str(self.job_id)
self.log.debug("Start with scheduler_job_id: %s",
self.scheduler_job_id)
self.kube_client = get_kube_client()
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 41f3a89b8a..f74d09162c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1882,7 +1882,7 @@ class Airflow(AirflowBaseView):
msg = f"Could not queue task instance for execution, dependencies
not met: {failed_deps_str}"
return redirect_or_json(origin, msg, "error", 400)
- executor.job_id = "manual"
+ executor.job_id = None
executor.start()
executor.queue_task_instance(
ti,