ferruzzi commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r2967728694
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -168,6 +177,84 @@ def _execute_callback(log: Logger, workload:
workloads.ExecuteCallback, team_con
raise RuntimeError(error_msg or "Callback execution failed")
+def _execute_connection_test(log: Logger, workload: workloads.TestConnection,
team_conf) -> None:
+ """
+ Execute a connection test workload.
+
+ Constructs an SDK ``Client``, fetches the connection via the Execution API,
+ enforces a timeout via ``signal.alarm``, and reports all outcomes back
+ through the Execution API.
+
+ :param log: Logger instance
+ :param workload: The TestConnection workload to execute
+ :param team_conf: Team-specific executor configuration
+ """
+ # Lazy import: SDK modules must not be loaded at module level to avoid
+ # coupling core (scheduler-loaded) code to the SDK.
+ from airflow.sdk.api.client import Client
+ from airflow.sdk.execution_time.comms import ErrorResponse
+
+ setproctitle(
+ f"{_get_executor_process_title_prefix(team_conf.team_name)}
connection-test {workload.connection_id}",
+ log,
+ )
+
+ base_url = team_conf.get("api", "base_url", fallback="/")
+ if base_url.startswith("/"):
+ base_url = f"http://localhost:8080{base_url}"
+ default_execution_api_server = f"{base_url.rstrip('/')}/execution/"
+ server = team_conf.get("core", "execution_api_server_url",
fallback=default_execution_api_server)
+
+ client = Client(base_url=server, token=workload.token)
+
+ def _handle_timeout(signum, frame):
+ raise TimeoutError(f"Connection test timed out after
{workload.timeout}s")
+
+ signal.signal(signal.SIGALRM, _handle_timeout)
+ signal.alarm(workload.timeout)
+ try:
+ client.connection_tests.update_state(workload.connection_test_id,
ConnectionTestState.RUNNING)
+
+ conn_response = client.connections.get(workload.connection_id)
+ if isinstance(conn_response, ErrorResponse):
+ raise RuntimeError(f"Connection '{workload.connection_id}' not
found via Execution API")
+
+ conn = Connection(
+ conn_id=conn_response.conn_id,
+ conn_type=conn_response.conn_type,
+ host=conn_response.host,
+ login=conn_response.login,
+ password=conn_response.password,
+ schema=conn_response.schema_,
+ port=conn_response.port,
+ extra=conn_response.extra,
+ )
+ success, message = run_connection_test(conn=conn)
+
+ state = ConnectionTestState.SUCCESS if success else
ConnectionTestState.FAILED
+ client.connection_tests.update_state(workload.connection_test_id,
state, message)
+ except TimeoutError:
+ log.error(
+ "Connection test timed out after %ds",
+ workload.timeout,
+ connection_id=workload.connection_id,
+ )
+ client.connection_tests.update_state(
+ workload.connection_test_id,
+ ConnectionTestState.FAILED,
+ f"Connection test timed out after {workload.timeout}s",
+ )
+ except Exception as e:
+ log.exception("Connection test failed unexpectedly",
connection_id=workload.connection_id)
+ client.connection_tests.update_state(
+ workload.connection_test_id,
+ ConnectionTestState.FAILED,
Review Comment:
This one looks alright to me, he's embedding the connection id, not the
error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]