This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch release-1.8.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/release-1.8.0 by this push:
new 4d386b9ab2 feat Apache Linkis MCP (#5252)
4d386b9ab2 is described below
commit 4d386b9ab236cfd2ef3cc756c11eb340d75af761
Author: Oliver Jay <[email protected]>
AuthorDate: Thu Oct 9 10:40:48 2025 +0800
feat Apache Linkis MCP (#5252)
---
linkis-mcp-server/auth.py | 11 +
linkis-mcp-server/config.json | 13 +
linkis-mcp-server/config.py | 59 +++++
linkis-mcp-server/datasource.py | 19 ++
linkis-mcp-server/knowledge.py | 326 ++++++++++++++++++++++++++
linkis-mcp-server/linkis_client.py | 130 ++++++++++
linkis-mcp-server/mcp_server.py | 220 +++++++++++++++++
linkis-mcp-server/metadata.py | 30 +++
linkis-mcp-server/readme.md | 74 ++++++
linkis-mcp-server/requirements.txt | 3 +
linkis-mcp-server/tasks.py | 53 +++++
linkis-mcp-server/test/__init__.py | 0
linkis-mcp-server/test/test_auth_api.py | 13 +
linkis-mcp-server/test/test_datasource_api.py | 30 +++
linkis-mcp-server/test/test_knowledge_api.py | 20 ++
linkis-mcp-server/test/test_linkis_client.py | 24 ++
linkis-mcp-server/test/test_mcp_tools.py | 14 ++
linkis-mcp-server/test/test_metadata_api.py | 51 ++++
linkis-mcp-server/test/test_tasks_api.py | 69 ++++++
19 files changed, 1159 insertions(+)
diff --git a/linkis-mcp-server/auth.py b/linkis-mcp-server/auth.py
new file mode 100644
index 0000000000..3f3137a170
--- /dev/null
+++ b/linkis-mcp-server/auth.py
@@ -0,0 +1,11 @@
+from typing import Dict, Any
+from linkis_client import LinkisClient
+from config import API_PATHS
+
+class AuthAPI:
+ def __init__(self, client: LinkisClient):
+ self.client = client
+
+ def login(self, username: str, password: str) -> Dict[str, Any]:
+ payload = {"userName": username, "password": password}
+ return self.client.post(API_PATHS["login"], payload)
diff --git a/linkis-mcp-server/config.json b/linkis-mcp-server/config.json
new file mode 100644
index 0000000000..3345a974bb
--- /dev/null
+++ b/linkis-mcp-server/config.json
@@ -0,0 +1,13 @@
+{
+ "current_env": "dev",
+ "dev": {
+ "LINKIS_BASE_URL": "http://localhost:9001",
+ "LINKIS_TOKEN": "your_dev_token",
+ "LINKIS_ENV": "dev"
+ },
+ "prod": {
+ "LINKIS_BASE_URL": "http://localhost:9001",
+ "LINKIS_TOKEN": "your_prod_token",
+ "LINKIS_ENV": "prod"
+ }
+}
diff --git a/linkis-mcp-server/config.py b/linkis-mcp-server/config.py
new file mode 100644
index 0000000000..c167d7b71c
--- /dev/null
+++ b/linkis-mcp-server/config.py
@@ -0,0 +1,59 @@
+import json
+import os
+
+import json
+import os
+
+CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config.json")
+
+if not os.path.exists(CONFIG_PATH):
+ raise FileNotFoundError(f"未找到配置文件: {CONFIG_PATH}")
+
+with open(CONFIG_PATH, "r", encoding="utf-8") as f:
+ all_cfg = json.load(f)
+
+env_mode = all_cfg.get("current_env", "dev")
+if env_mode not in all_cfg:
+ raise ValueError(f"未知配置环境: {env_mode},可选值: {list(all_cfg.keys())}")
+
+cfg = all_cfg[env_mode]
+
+LINKIS_BASE_URL = cfg.get("LINKIS_BASE_URL")
+LINKIS_TOKEN = cfg.get("LINKIS_TOKEN")
+LINKIS_ENV = cfg.get("LINKIS_ENV", env_mode)
+
+if not LINKIS_BASE_URL:
+ raise ValueError(f"[{env_mode}] LINKIS_BASE_URL 未配置")
+if not LINKIS_TOKEN:
+ raise ValueError(f"[{env_mode}] LINKIS_TOKEN 未配置")
+
+# API 路径集中管理
+API_PATHS = {
+ # Auth
+ "login": "/api/rest_j/v1/user/login",
+
+ # Metadata
+ "get_columns": "/api/rest_j/v1/metadataQuery/getColumns",
+ "get_databases": "/api/rest_j/v1/metadataQuery/getDatabases",
+ "get_tables": "/api/rest_j/v1/metadataQuery/getTables",
+ "get_partitions": "/api/rest_j/v1/metadataQuery/getPartitions",
+ "get_table_props": "/api/rest_j/v1/metadataQuery/getTableProps",
+ "get_partition_props": "/api/rest_j/v1/metadataQuery/getPartitionProps",
+
+ # DataSource
+ "displaysql": "/api/rest_j/v1/datasource/displaysql",
+ "get_table_fields_info": "/api/rest_j/v1/datasource/getTableFieldsInfo",
+ "get_table_statistic_info":
"/api/rest_j/v1/datasource/getTableStatisticInfo",
+
+ # Tasks
+ "execute": "/api/rest_j/v1/entrance/execute",
+ "submit": "/api/rest_j/v1/entrance/submit",
+ "kill": "/api/rest_j/v1/entrance/{id}/kill",
+ "kill_jobs": "/api/rest_j/v1/entrance/{id}/killJobs",
+ "pause": "/api/rest_j/v1/entrance/{id}/pause",
+ "progress": "/api/rest_j/v1/entrance/{id}/progress",
+ "progress_with_resource":
"/api/rest_j/v1/entrance/{id}/progressWithResource",
+ "status": "/api/rest_j/v1/entrance/{id}/status",
+ "runningtask": "/api/rest_j/v1/entrance/api/metrics/runningtask",
+ "taskinfo": "/api/rest_j/v1/entrance/api/metrics/taskinfo"
+}
diff --git a/linkis-mcp-server/datasource.py b/linkis-mcp-server/datasource.py
new file mode 100644
index 0000000000..bb23e7e91f
--- /dev/null
+++ b/linkis-mcp-server/datasource.py
@@ -0,0 +1,19 @@
+from typing import Dict, Any
+from linkis_client import LinkisClient
+from config import API_PATHS
+
+class DataSourceAPI:
+ def __init__(self, client: LinkisClient):
+ self.client = client
+
+ def displaysql(self, datasource_id: int, sql: str) -> Dict[str, Any]:
+ payload = {"id": datasource_id, "sql": sql}
+ return self.client.post(API_PATHS["displaysql"], payload)
+
+ def get_table_fields_info(self, datasource_id: int, table: str) ->
Dict[str, Any]:
+ params = {"id": datasource_id, "table": table}
+ return self.client.get(API_PATHS["get_table_fields_info"], params)
+
+ def get_table_statistic_info(self, datasource_id: int, table: str) ->
Dict[str, Any]:
+ params = {"id": datasource_id, "table": table}
+ return self.client.get(API_PATHS["get_table_statistic_info"], params)
diff --git a/linkis-mcp-server/knowledge.py b/linkis-mcp-server/knowledge.py
new file mode 100644
index 0000000000..4182fe3cdf
--- /dev/null
+++ b/linkis-mcp-server/knowledge.py
@@ -0,0 +1,326 @@
+from typing import List, Dict, Optional
+
+class KnowledgeEntry:
+ def __init__(self, id: str, question: str, answer: str,
+ tags: Optional[List[str]] = None, aliases:
Optional[List[str]] = None):
+ self.id = id
+ self.question = question
+ self.answer = answer
+ self.tags = tags or []
+ self.aliases = aliases or []
+
+ def matches(self, query: str) -> bool:
+ query_lower = query.lower()
+ return (query_lower in self.question.lower() or
+ any(query_lower in alias.lower() for alias in self.aliases) or
+ query_lower == self.id.lower())
+
+
+class KnowledgeQA:
+ def __init__(self):
+ self._entries: List[KnowledgeEntry] = []
+ self._load_entries()
+
+ def _load_entries(self):
+ raw_data = [
+ {
+ "id": "api-entrance-submit-sql",
+ "question": "如何提交一条 SQL 任务",
+ "answer": "POST /api/rest_j/v1/entrance/submit\n必填:
executionContent.code=SQL文本, executionContent.runType=sql, labels.engineType(如
hive-1.2.1), labels.userCreator(如 IDE_user)",
+ "tags": ["api", "task", "sql"],
+ "aliases": ["提交SQL任务", "SQL任务提交接口", "SQL 提交"]
+ },
+ {
+ "id": "api-entrance-submit-script",
+ "question": "如何提交脚本任务并指定语言",
+ "answer": "POST
/api/rest_j/v1/entrance/submit\nexecutionContent.code=脚本,
executionContent.runType=python/shell/spark, labels.engineType 与引擎匹配",
+ "tags": ["api", "task", "script"],
+ "aliases": ["提交脚本", "提交 Python 任务", "提交 Shell 任务"]
+ },
+ {
+ "id": "api-entrance-status",
+ "question": "如何查询任务状态",
+ "answer": "GET /api/rest_j/v1/entrance/{execID}/status\n路径参数:
execID=任务ID",
+ "tags": ["api", "task", "status"],
+ "aliases": ["任务状态查询", "查询状态"]
+ },
+ {
+ "id": "api-entrance-progress",
+ "question": "如何查询任务进度",
+ "answer": "GET
/api/rest_j/v1/entrance/{execID}/progress\n返回阶段进度与总进度",
+ "tags": ["api", "task", "progress"],
+ "aliases": ["任务进度", "进度查询"]
+ },
+ {
+ "id": "api-entrance-log",
+ "question": "如何获取任务日志",
+ "answer": "GET
/api/rest_j/v1/entrance/{execID}/log?fromLine=1&size=200\n参数: fromLine 起始行,
size 行数",
+ "tags": ["api", "task", "log"],
+ "aliases": ["查看日志", "拉取日志"]
+ },
+ {
+ "id": "api-entrance-result-list",
+ "question": "如何列出任务的结果集",
+ "answer": "GET
/api/rest_j/v1/entrance/{execID}/resultset\n返回结果集索引与基本信息",
+ "tags": ["api", "task", "result"],
+ "aliases": ["结果集列表", "列出结果集"]
+ },
+ {
+ "id": "api-entrance-result",
+ "question": "如何获取任务结果集",
+ "answer": "GET
/api/rest_j/v1/entrance/{execID}/resultset/{index}?size=500\n参数: index 结果集序号,
size 返回行数",
+ "tags": ["api", "task", "result"],
+ "aliases": ["下载结果集", "查看结果集"]
+ },
+ {
+ "id": "api-entrance-kill",
+ "question": "如何杀死任务",
+ "answer": "POST /api/rest_j/v1/entrance/{execID}/kill",
+ "tags": ["api", "task", "kill"],
+ "aliases": ["终止任务", "取消任务", "停止任务"]
+ },
+ {
+ "id": "api-metadata-databases",
+ "question": "如何查询数据源下的数据库列表",
+ "answer": "GET
/api/rest_j/v1/metadataQuery/getDatabases?dataSourceName=xxx&system=hive",
+ "tags": ["api", "metadata", "database"],
+ "aliases": ["获取数据库列表", "列出数据库"]
+ },
+ {
+ "id": "api-metadata-tables",
+ "question": "如何查询数据库下的表列表",
+ "answer": "GET
/api/rest_j/v1/metadataQuery/getTables?dataSourceName=xxx&database=xxx&system=hive",
+ "tags": ["api", "metadata", "table"],
+ "aliases": ["获取表列表", "列出表"]
+ },
+ {
+ "id": "api-metadata-columns",
+ "question": "如何查询表的字段信息",
+ "answer": "GET
/api/rest_j/v1/metadataQuery/getColumns?dataSourceName=xxx&database=xxx&table=xxx&system=hive",
+ "tags": ["api", "metadata", "column"],
+ "aliases": ["获取表结构", "查看字段信息"]
+ },
+ {
+ "id": "api-metadata-partitions",
+ "question": "如何查询表的分区信息",
+ "answer": "GET
/api/rest_j/v1/metadataQuery/getPartitions?dataSourceName=xxx&database=xxx&table=xxx&system=hive",
+ "tags": ["api", "metadata", "partition"],
+ "aliases": ["查看分区", "分区信息查询"]
+ },
+ {
+ "id": "api-metadata-tableinfo",
+ "question": "如何获取表的详细信息",
+ "answer": "GET
/api/rest_j/v1/metadataQuery/getTableInfo?dataSourceName=xxx&database=xxx&table=xxx&system=hive",
+ "tags": ["api", "metadata", "table"],
+ "aliases": ["表信息", "表详情"]
+ },
+ {
+ "id": "api-datasource-list",
+ "question": "如何获取已注册数据源列表",
+ "answer": "GET /api/rest_j/v1/datasource/getAll",
+ "tags": ["api", "datasource", "list"],
+ "aliases": ["列出数据源", "获取数据源列表"]
+ },
+ {
+ "id": "api-datasource-detail",
+ "question": "如何根据 ID 获取数据源详情",
+ "answer": "GET /api/rest_j/v1/datasource/get?id=123",
+ "tags": ["api", "datasource", "detail"],
+ "aliases": ["数据源详情", "查询数据源"]
+ },
+ {
+ "id": "api-datasource-search",
+ "question": "如何按名称搜索数据源",
+ "answer": "GET /api/rest_j/v1/datasource/search?name=xxx",
+ "tags": ["api", "datasource", "search"],
+ "aliases": ["搜索数据源", "按名称查数据源"]
+ },
+ {
+ "id": "api-datasource-create",
+ "question": "如何创建数据源",
+ "answer": "POST /api/rest_j/v1/datasource/create\nBody: name,
type(如 hive/mysql), connectParams(主机/端口/库/用户名等), labels(可选)",
+ "tags": ["api", "datasource", "create"],
+ "aliases": ["新建数据源", "注册数据源"]
+ },
+ {
+ "id": "api-datasource-update",
+ "question": "如何更新数据源配置",
+ "answer": "POST /api/rest_j/v1/datasource/update\nBody: id,
需更新的字段,如 connectParams 或 labels",
+ "tags": ["api", "datasource", "update"],
+ "aliases": ["修改数据源", "编辑数据源"]
+ },
+ {
+ "id": "api-datasource-delete",
+ "question": "如何删除数据源",
+ "answer": "POST /api/rest_j/v1/datasource/delete\nBody: id 或
ids 列表",
+ "tags": ["api", "datasource", "delete"],
+ "aliases": ["移除数据源", "删除数据源"]
+ },
+ {
+ "id": "api-datasource-test",
+ "question": "如何测试数据源连通性",
+ "answer": "POST /api/rest_j/v1/datasource/testConnect\nBody:
type, connectParams(与创建一致)",
+ "tags": ["api", "datasource", "test"],
+ "aliases": ["测试连接", "连通性测试"]
+ },
+ {
+ "id": "api-datasource-displaysql",
+ "question": "如何生成建表 DDL",
+ "answer": "POST /api/rest_j/v1/datasource/displaysql\nBody 可包含
table/schema 字段,用于生成建库建表 DDL",
+ "tags": ["api", "datasource", "ddl"],
+ "aliases": ["生成DDL", "建表语句"]
+ },
+ {
+ "id": "api-jobhistory-list",
+ "question": "如何分页查询任务历史",
+ "answer": "GET
/api/rest_j/v1/jobhistory/list?user=xxx&pageNow=1&pageSize=20",
+ "tags": ["api", "task", "history"],
+ "aliases": ["任务历史查询", "任务列表"]
+ },
+ {
+ "id": "api-jobhistory-detail",
+ "question": "如何获取历史任务详情",
+ "answer": "GET /api/rest_j/v1/jobhistory/{jobID}",
+ "tags": ["api", "task", "history"],
+ "aliases": ["任务详情", "历史任务详情"]
+ },
+ {
+ "id": "cfg-engine-type",
+ "question": "labels.engineType 如何配置",
+ "answer": "labels.engineType 需与部署的引擎标识一致,如 hive-1.2.1,
spark-2.4.7。示例:
{\"labels\":{\"engineType\":\"hive-1.2.1\",\"userCreator\":\"IDE_user\"}}",
+ "tags": ["config", "engine"],
+ "aliases": ["engineType 配置", "引擎类型标签"]
+ },
+ {
+ "id": "cfg-user-creator",
+ "question": "labels.userCreator 有什么作用",
+ "answer": "用于标识请求来源或创建者类型,例如 IDE_user、scheduler_xxx,便于路由和审计",
+ "tags": ["config", "labels"],
+ "aliases": ["userCreator 配置", "用户创建者标签"]
+ },
+ {
+ "id": "cfg-timeout",
+ "question": "任务超时时间如何设置",
+ "answer": "可通过 runtime.max.askExecutorTimes 或
engineConn.timeout 控制任务超时(毫秒),具体取决于你的引擎与网关实现",
+ "tags": ["config", "timeout"],
+ "aliases": ["设置超时", "执行超时配置"]
+ },
+ {
+ "id": "cfg-queue",
+ "question": "如何指定资源队列",
+ "answer": "通常通过 labels.queue 或执行参数中设置队列名(如 YARN
队列),也可在引擎侧配置默认队列",
+ "tags": ["config", "resource"],
+ "aliases": ["设置队列", "资源队列配置"]
+ },
+ {
+ "id": "cfg-script-path",
+ "question": "如何记录脚本来源路径",
+ "answer": "可在 source.scriptPath 中传入源脚本路径(如
/tmp/demo.sql),便于审计与追踪",
+ "tags": ["config", "source"],
+ "aliases": ["scriptPath", "脚本路径"]
+ },
+ {
+ "id": "cfg-vars",
+ "question": "如何在任务中传递变量",
+ "answer": "可通过 params.variable 传递键值对变量,由引擎侧在执行前进行替换/注入",
+ "tags": ["config", "params"],
+ "aliases": ["任务变量", "参数变量"]
+ },
+ {
+ "id": "err-11001",
+ "question": "Linkis 报错 11001 是什么原因",
+ "answer": "11001 通常表示认证失败或未登录。请检查登录状态、凭据有效性以及网关鉴权配置",
+ "tags": ["error", "auth"],
+ "aliases": ["错误码 11001", "未登录 11001"]
+ },
+ {
+ "id": "err-12001",
+ "question": "参数错误 12001 如何处理",
+ "answer": "检查必填参数是否缺失,类型是否正确(如 envId 应为字符串),并参考接口文档修正",
+ "tags": ["error", "params"],
+ "aliases": ["错误码 12001", "参数错误 12001"]
+ },
+ {
+ "id": "err-13002",
+ "question": "执行引擎不可用 13002 怎么办",
+ "answer": "该错误表示目标执行引擎未启动或标签不匹配,请检查引擎服务状态、资源队列与
labels.engineType",
+ "tags": ["error", "engine"],
+ "aliases": ["错误码 13002", "引擎不可用"]
+ },
+ {
+ "id": "err-14001",
+ "question": "权限不足 14001 如何排查",
+ "answer": "确认调用用户是否具备操作权限(数据源、数据库、表级权限),检查网关与后端鉴权策略",
+ "tags": ["error", "permission"],
+ "aliases": ["错误码 14001", "无权限 14001"]
+ },
+ {
+ "id": "err-15001",
+ "question": "任务执行超时 15001 如何处理",
+ "answer": "适当提升超时阈值(如 engineConn.timeout),优化
SQL/脚本,或检查资源队列拥塞情况",
+ "tags": ["error", "timeout"],
+ "aliases": ["错误码 15001", "执行超时 15001"]
+ },
+ {
+ "id": "err-16001",
+ "question": "资源不足 16001 如何处理",
+ "answer": "检查队列/集群资源是否充足,调大资源配额或调整并发度,必要时更换空闲队列",
+ "tags": ["error", "resource"],
+ "aliases": ["错误码 16001", "资源不足 16001"]
+ },
+ {
+ "id": "howto-no-resultset",
+ "question": "提交成功但没有结果集怎么办",
+ "answer": "确认语句是否产生结果(如 DDL/DML 通常无结果集),检查 resultset
索引列表与权限限制",
+ "tags": ["howto", "result"],
+ "aliases": ["无结果集", "没有结果"]
+ },
+ {
+ "id": "howto-paginate-result",
+ "question": "如何分页获取大结果集",
+ "answer": "通过 size 控制每次拉取行数,并循环拉取;若支持 offset/nextToken,可按游标分页",
+ "tags": ["howto", "result"],
+ "aliases": ["结果集分页", "分页获取结果"]
+ },
+ {
+ "id": "howto-retry",
+ "question": "任务失败如何重试",
+ "answer": "可重新提交同一脚本/SQL;若系统支持重试 API,优先使用;同时排查失败原因(日志、资源、权限)",
+ "tags": ["howto", "task"],
+ "aliases": ["失败重试", "任务重试"]
+ },
+ {
+ "id": "howto-choose-engine",
+ "question": "如何选择合适的引擎",
+ "answer": "按任务类型与生态匹配引擎(如 SQL 选 hive/spark-sql,批处理选 spark,脚本选
python/shell),并配置对应 engineType",
+ "tags": ["howto", "engine"],
+ "aliases": ["选择引擎", "引擎如何选"]
+ },
+ ]
+
+ self._entries = [KnowledgeEntry(**item) for item in raw_data]
+
+ def ask(self, query: str, top_k: int = 1) -> List[Dict]:
+ matches = [e for e in self._entries if e.matches(query)]
+ results = [{
+ "id": e.id,
+ "question": e.question,
+ "answer": e.answer,
+ "tags": e.tags,
+ "aliases": e.aliases
+ } for e in matches]
+ return results[:top_k]
+
+ def search_by_tag(self, tag: str) -> List[Dict]:
+ tag_lower = tag.lower()
+ matches = [e for e in self._entries if tag_lower in (t.lower() for t
in e.tags)]
+ return [{
+ "id": e.id,
+ "question": e.question,
+ "answer": e.answer,
+ "tags": e.tags,
+ "aliases": e.aliases
+ } for e in matches]
+
+ def add_entry(self, entry: Dict):
+ self._entries.append(KnowledgeEntry(**entry))
\ No newline at end of file
diff --git a/linkis-mcp-server/linkis_client.py
b/linkis-mcp-server/linkis_client.py
new file mode 100644
index 0000000000..d9e36d6309
--- /dev/null
+++ b/linkis-mcp-server/linkis_client.py
@@ -0,0 +1,130 @@
+from __future__ import annotations
+
+import json
+from typing import Any, Dict, Optional, Union
+
+import requests
+
+from config import LINKIS_BASE_URL, LINKIS_TOKEN, LINKIS_ENV
+
+class LinkisError(Exception):
+ def __init__(self, message: str, status: Optional[int] = None, payload:
Any = None):
+ super().__init__(message)
+ self.status = status
+ self.payload = payload
+
+ def __str__(self) -> str:
+ base = super().__str__()
+ if self.status is not None:
+ base += f" (HTTP {self.status})"
+ if self.payload is not None:
+ base += f" | payload={self.payload}"
+ return base
+
+
+def _join_url(base_url: str, path: str) -> str:
+ return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
+
+
+def _clean_params(d: Optional[Dict[str, Any]]) -> Dict[str, Any]:
+ return {k: v for k, v in (d or {}).items() if v is not None}
+
+
+def _interpolate_path(path_template: str, path_params: Optional[Dict[str,
Any]]) -> str:
+ if not path_params:
+ return path_template
+ out = path_template
+ for k, v in path_params.items():
+ out = out.replace(f"{{{k}}}", requests.utils.quote(str(v), safe=""))
+ return out
+
+
+def _safe_resp_payload(resp: requests.Response) -> Any:
+ try:
+ return resp.json()
+ except Exception:
+ return resp.text
+
+
+class LinkisClient:
+
+ def __init__(
+ self,
+ base_url: Optional[str] = None,
+ token: Optional[str] = None,
+ username: Optional[str] = None,
+ password: Optional[str] = None,
+ timeout: Optional[Union[int, float]] = None,
+ verify_ssl: Optional[bool] = True,
+ extra_headers: Optional[Dict[str, str]] = None,
+ ):
+ self.base_url = (base_url or LINKIS_BASE_URL).strip()
+ if not self.base_url:
+ raise ValueError("LINKIS_BASE_URL 未配置")
+
+ self.token = (token or LINKIS_TOKEN).strip() if (token or
LINKIS_TOKEN) else None
+ self.username = username
+ self.password = password
+ self.timeout = float(timeout if timeout is not None else 30)
+ self.verify_ssl = verify_ssl
+
+ self._session = requests.Session()
+ self._session.headers.update({"Accept": "*/*"})
+ if self.token:
+ self._session.headers.update({"Authorization": f"Bearer
{self.token}"})
+ if extra_headers:
+ self._session.headers.update(extra_headers)
+
+ if self.username and self.password:
+ try:
+ self.login(self.username, self.password)
+ except Exception as e:
+ print(f"[LinkisClient] Auto login failed: {e}")
+
+ def login(self, user_name: str, password: str) -> Dict[str, Any]:
+ path = "/api/rest_j/v1/user/login"
+ url = _join_url(self.base_url, path)
+ payload = {"userName": user_name, "password": password}
+
+ resp = self._session.post(url, json=payload, timeout=self.timeout,
verify=self.verify_ssl)
+ if not resp.ok:
+ raise LinkisError("Login failed", status=resp.status_code,
payload=_safe_resp_payload(resp))
+
+ data = _safe_resp_payload(resp)
+ return data
+
+ def get(
+ self,
+ path: str,
+ params: Optional[Dict[str, Any]] = None,
+ path_params: Optional[Dict[str, Any]] = None,
+ ) -> Any:
+ url = _join_url(self.base_url, _interpolate_path(path, path_params))
+ resp = self._session.get(url, params=_clean_params(params),
timeout=self.timeout, verify=self.verify_ssl)
+ return self._handle_response(resp)
+
+ def post(
+ self,
+ path: str,
+ json_body: Optional[Dict[str, Any]] = None,
+ path_params: Optional[Dict[str, Any]] = None,
+ ) -> Any:
+ url = _join_url(self.base_url, _interpolate_path(path, path_params))
+ headers = {"Content-Type": "application/json"}
+ resp = self._session.post(
+ url,
+ data=None if json_body is None else json.dumps(json_body),
+ headers=headers,
+ timeout=self.timeout,
+ verify=self.verify_ssl,
+ )
+ return self._handle_response(resp)
+
+ def _handle_response(self, resp: requests.Response) -> Any:
+ if not resp.ok:
+ raise LinkisError("Request failed", status=resp.status_code,
payload=_safe_resp_payload(resp))
+
+ try:
+ return resp.json()
+ except ValueError:
+ return resp.text
diff --git a/linkis-mcp-server/mcp_server.py b/linkis-mcp-server/mcp_server.py
new file mode 100644
index 0000000000..439cfb7b6c
--- /dev/null
+++ b/linkis-mcp-server/mcp_server.py
@@ -0,0 +1,220 @@
+from mcp.server.fastmcp import FastMCP
+
+from config import LINKIS_TOKEN, LINKIS_ENV
+from linkis_client import LinkisClient
+from auth import AuthAPI
+from metadata import MetadataAPI
+from datasource import DataSourceAPI
+from tasks import TaskAPI
+from knowledge import KnowledgeQA
+
+mcp = FastMCP("Linkis MCP Server")
+
+# 初始化 Linkis 客户端与各模块
+client = LinkisClient(LINKIS_TOKEN)
+
+auth_api = AuthAPI(client)
+metadata_api = MetadataAPI(client)
+datasource_api = DataSourceAPI(client)
+task_api = TaskAPI(client)
+qa = KnowledgeQA()
+
+
+def _ok(data):
+ return {"ok": True, "data": data}
+
+
+def _err(e: Exception):
+ return {"ok": False, "error": str(e)}
+
+
+# ========== Auth ==========
[email protected]()
+def login(username: str, password: str):
+ """用户登录,获取/验证凭据"""
+ try:
+ return _ok(auth_api.login(username, password))
+ except Exception as e:
+ return _err(e)
+
+
+# ========== Metadata ==========
[email protected]()
+def get_databases():
+ try:
+ return _ok(metadata_api.get_databases())
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def get_tables(database: str):
+ try:
+ return _ok(metadata_api.get_tables(database))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def get_columns(database: str, table: str):
+ try:
+ return _ok(metadata_api.get_columns(database, table))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def get_partitions(database: str, table: str):
+ try:
+ return _ok(metadata_api.get_partitions(database, table))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def get_table_props(database: str, table: str):
+ try:
+ return _ok(metadata_api.get_table_props(database, table))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def get_partition_props(database: str, table: str, partition: str):
+ try:
+ return _ok(metadata_api.get_partition_props(database, table,
partition))
+ except Exception as e:
+ return _err(e)
+
+
+# ========== DataSource ==========
[email protected]()
+def displaysql(datasource_id: int, sql: str):
+ try:
+ return _ok(datasource_api.displaysql(datasource_id, sql))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def get_table_fields_info(datasource_id: int, table: str):
+ try:
+ return _ok(datasource_api.get_table_fields_info(datasource_id, table))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def get_table_statistic_info(datasource_id: int, table: str):
+ try:
+ return _ok(datasource_api.get_table_statistic_info(datasource_id,
table))
+ except Exception as e:
+ return _err(e)
+
+
+# ========== Tasks ==========
[email protected]()
+def execute(code: str, execute_user: str, engine_type: str = "spark"):
+ try:
+ return _ok(task_api.execute(code, execute_user, engine_type))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def submit(code: str, execute_user: str, engine_type: str = "spark"):
+ try:
+ return _ok(task_api.submit(code, execute_user, engine_type))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def status(exec_id: str):
+ try:
+ return _ok(task_api.status(exec_id))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def progress(exec_id: str):
+ try:
+ return _ok(task_api.progress(exec_id))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def progress_with_resource(exec_id: str):
+ try:
+ return _ok(task_api.progress_with_resource(exec_id))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def pause(exec_id: str):
+ try:
+ return _ok(task_api.pause(exec_id))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def kill(exec_id: str):
+ try:
+ return _ok(task_api.kill(exec_id))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def kill_jobs(exec_id: str):
+ try:
+ return _ok(task_api.kill_jobs(exec_id))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def runningtask():
+ try:
+ return _ok(task_api.runningtask())
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def taskinfo():
+ try:
+ return _ok(task_api.taskinfo())
+ except Exception as e:
+ return _err(e)
+
+
+# ========== Knowledge QA ==========
[email protected]()
+def qa_ask(question: str, top_k: int = 3):
+ try:
+ return _ok(qa.ask(question, top_k=top_k))
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def qa_add(question: str, answer: str, tags: str = "", aliases: str = ""):
+ try:
+ entry = {
+ "question": question,
+ "answer": answer,
+ "tags": [t.strip() for t in tags.split(",") if t.strip()],
+ "aliases": [a.strip() for a in aliases.split(",") if a.strip()],
+ }
+ created = qa.add_entry(entry, save=True)
+ return _ok({"entry": created.__dict__})
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def qa_topics():
+ try:
+ return _ok({"topics": qa.topics()})
+ except Exception as e:
+ return _err(e)
+
[email protected]()
+def qa_reload():
+ try:
+ qa.reload()
+ return _ok({"reloaded": True})
+ except Exception as e:
+ return _err(e)
+
+
+if __name__ == "__main__":
+ print(f"[MCP] Starting Linkis MCP Server | ENV={LINKIS_ENV}")
+ mcp.run()
diff --git a/linkis-mcp-server/metadata.py b/linkis-mcp-server/metadata.py
new file mode 100644
index 0000000000..5202127367
--- /dev/null
+++ b/linkis-mcp-server/metadata.py
@@ -0,0 +1,30 @@
+from typing import Dict, Any
+from linkis_client import LinkisClient
+from config import API_PATHS
+
+class MetadataAPI:
+ def __init__(self, client: LinkisClient):
+ self.client = client
+
+ def get_columns(self, database: str, table: str) -> Dict[str, Any]:
+ params = {"database": database, "table": table}
+ return self.client.get(API_PATHS["get_columns"], params)
+
+ def get_databases(self) -> Dict[str, Any]:
+ return self.client.get(API_PATHS["get_databases"])
+
+ def get_tables(self, database: str) -> Dict[str, Any]:
+ params = {"database": database}
+ return self.client.get(API_PATHS["get_tables"], params)
+
+ def get_partitions(self, database: str, table: str) -> Dict[str, Any]:
+ params = {"database": database, "table": table}
+ return self.client.get(API_PATHS["get_partitions"], params)
+
+ def get_table_props(self, database: str, table: str) -> Dict[str, Any]:
+ params = {"database": database, "table": table}
+ return self.client.get(API_PATHS["get_table_props"], params)
+
+ def get_partition_props(self, database: str, table: str, partition: str)
-> Dict[str, Any]:
+ params = {"database": database, "table": table, "partition": partition}
+ return self.client.get(API_PATHS["get_partition_props"], params)
diff --git a/linkis-mcp-server/readme.md b/linkis-mcp-server/readme.md
new file mode 100644
index 0000000000..76d3380455
--- /dev/null
+++ b/linkis-mcp-server/readme.md
@@ -0,0 +1,74 @@
+# 📌 Linkis MCP Server
+
+## 📖 项目简介
+**Linkis MCP Server** 是一个基于 **Model Context Protocol (MCP)** 的服务端实现,
+它将 **Linkis** 的元数据管理、数据源管理、任务调度与监控等核心能力,统一封装为标准化接口,并以 **MCP 工具**形式对外提供服务。
+这样,AI Agent、自动化脚本或其他客户端都能通过统一协议便捷调用、集成和扩展 Linkis 功能。
+
+---
+
+## 🎯 设计目标
+- **统一 API 接口层**:降低直接调用 Linkis REST API 的复杂度
+- **模块化结构**:便于扩展新功能模块
+- **健壮的错误处理机制**:保障生产环境的稳定性
+- **对 AI 友好**:天然适配 AI Agent 等智能系统
+
+---
+
+## 🛠 功能特点
+- **认证管理**:封装登录/登出等认证流程
+- **元数据 API**:查询库、表、字段、分区信息
+- **数据源 API**:管理、创建、修改、删除数据源
+- **任务管理 API**:任务提交、状态查询、进度监控、杀任务等
+- **标准化 MCP 工具**:便于跨应用调用
+- **统一错误处理**:所有接口返回格式统一,易于调试与接入
+
+---
+
+## 🏗 架构概览
+```
+linkis-mcp-server/
+├── mcp_server.py # MCP Server 主入口,注册所有工具
+├── linkis_client.py # 底层 HTTP 客户端
+├── auth.py # 认证/登录模块
+├── metadata.py # 元数据 API
+├── knowledge.py # 知识问答
+├── datasource.py # 数据源集成 API
+├── tasks.py # 任务和计算治理 API
+├── config.py # 配置管理(环境变量、常量)
+├── requirements.txt # Python 依赖
+├── README.md # 项目文档
+└──tests/ # 单元测试
+ ├── test_auth_api.py
+ ├── test_metadata_api.py
+ ├── test_datasource_api.py
+ ├── test_tasks_api.py
+ ├── test_knowledge_api.py
+ └── test_mcp_tools.py
+
+```
+---
+
+## 📦 安装与运行
+
+```bash
+# 1. 克隆代码仓库
+git clone https://your-repo-url.git
+cd linkis-mcp-server
+
+# 2. 创建并启用虚拟环境
+python -m venv venv
+source venv/bin/activate # macOS / Linux
+venv\Scripts\activate # Windows
+
+# 3. 安装依赖
+pip install --upgrade pip
+pip install -r requirements.txt
+
+# 4. 配置环境
+# 在 config.json 中设置 Linkis 地址、端口、Token 等信息
+nano config.json
+
+# 5. 启动 MCP 服务
+python mcp_server.py
+
diff --git a/linkis-mcp-server/requirements.txt
b/linkis-mcp-server/requirements.txt
new file mode 100644
index 0000000000..4e632947c1
--- /dev/null
+++ b/linkis-mcp-server/requirements.txt
@@ -0,0 +1,3 @@
+pytest~=7.4.0
+mcp~=1.12.2
+requests~=2.32.4
\ No newline at end of file
diff --git a/linkis-mcp-server/tasks.py b/linkis-mcp-server/tasks.py
new file mode 100644
index 0000000000..823afb804c
--- /dev/null
+++ b/linkis-mcp-server/tasks.py
@@ -0,0 +1,53 @@
+from typing import Dict, Any
+from linkis_client import LinkisClient
+from config import API_PATHS
+
+class TaskAPI:
+ def __init__(self, client: LinkisClient):
+ self.client = client
+
+ def execute(self, code: str, execute_user: str, engine_type: str =
"spark") -> Dict[str, Any]:
+ payload = {
+ "executeUser": execute_user,
+ "executionCode": code,
+ "engineType": engine_type
+ }
+ return self.client.post(API_PATHS["execute"], payload)
+
+ def submit(self, code: str, execute_user: str, engine_type: str = "spark")
-> Dict[str, Any]:
+ payload = {
+ "executeUser": execute_user,
+ "executionCode": code,
+ "engineType": engine_type
+ }
+ return self.client.post(API_PATHS["submit"], payload)
+
+ def kill(self, exec_id: str) -> Dict[str, Any]:
+ path = API_PATHS["kill"].format(id=exec_id)
+ return self.client.get(path)
+
+ def kill_jobs(self, exec_id: str) -> Dict[str, Any]:
+ path = API_PATHS["kill_jobs"].format(id=exec_id)
+ return self.client.get(path)
+
+ def pause(self, exec_id: str) -> Dict[str, Any]:
+ path = API_PATHS["pause"].format(id=exec_id)
+ return self.client.get(path)
+
+ def progress(self, exec_id: str) -> Dict[str, Any]:
+ path = API_PATHS["progress"].format(id=exec_id)
+ return self.client.get(path)
+
+ def progress_with_resource(self, exec_id: str) -> Dict[str, Any]:
+ path = API_PATHS["progress_with_resource"].format(id=exec_id)
+ return self.client.get(path)
+
+ def status(self, exec_id: str) -> Dict[str, Any]:
+ path = API_PATHS["status"].format(id=exec_id)
+ return self.client.get(path)
+
+ def runningtask(self) -> Dict[str, Any]:
+ return self.client.get(API_PATHS["runningtask"])
+
+ def taskinfo(self) -> Dict[str, Any]:
+ return self.client.get(API_PATHS["taskinfo"])
diff --git a/linkis-mcp-server/test/__init__.py
b/linkis-mcp-server/test/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/linkis-mcp-server/test/test_auth_api.py
b/linkis-mcp-server/test/test_auth_api.py
new file mode 100644
index 0000000000..7da9dde2df
--- /dev/null
+++ b/linkis-mcp-server/test/test_auth_api.py
@@ -0,0 +1,13 @@
+import pytest
+from config import LINKIS_BASE_URL, LINKIS_TOKEN
+from linkis_client import LinkisClient
+from auth import AuthAPI
+
[email protected](scope="module")
+def auth_api():
+ client = LinkisClient(base_url=LINKIS_BASE_URL, token=LINKIS_TOKEN)
+ return AuthAPI(client)
+
+def test_login_fail(auth_api):
+ with pytest.raises(Exception):
+ auth_api.login("fake_user", "wrong_password")
diff --git a/linkis-mcp-server/test/test_datasource_api.py
b/linkis-mcp-server/test/test_datasource_api.py
new file mode 100644
index 0000000000..db8f0f8697
--- /dev/null
+++ b/linkis-mcp-server/test/test_datasource_api.py
@@ -0,0 +1,30 @@
+import pytest
+from config import LINKIS_BASE_URL, LINKIS_TOKEN
+from linkis_client import LinkisClient
+from datasource import DataSourceAPI
+
[email protected](scope="module")
+def datasource_api():
+ client = LinkisClient(base_url=LINKIS_BASE_URL, token=LINKIS_TOKEN)
+ return DataSourceAPI(client)
+
+def test_displaysql(datasource_api):
+ try:
+ res = datasource_api.displaysql(1, "CREATE TABLE test (id INT)")
+ assert isinstance(res, dict)
+ except Exception as e:
+ pytest.skip(f"displaysql skipped: {e}")
+
+def test_get_table_fields_info(datasource_api):
+ try:
+ res = datasource_api.get_table_fields_info(1, "some_table")
+ assert isinstance(res, list)
+ except Exception as e:
+ pytest.skip(f"get_table_fields_info skipped: {e}")
+
+def test_get_table_statistic_info(datasource_api):
+ try:
+ res = datasource_api.get_table_statistic_info(1, "some_table")
+ assert isinstance(res, dict)
+ except Exception as e:
+ pytest.skip(f"get_table_statistic_info skipped: {e}")
diff --git a/linkis-mcp-server/test/test_knowledge_api.py
b/linkis-mcp-server/test/test_knowledge_api.py
new file mode 100644
index 0000000000..a4c4bd2884
--- /dev/null
+++ b/linkis-mcp-server/test/test_knowledge_api.py
@@ -0,0 +1,20 @@
+import pytest
+from knowledge import KnowledgeQA
+
[email protected](scope="module")
+def qa():
+ return KnowledgeQA()
+
+def test_qa_add_and_ask(qa):
+ qa.add_entry({
+ "question": "测试问题",
+ "answer": "测试答案",
+ "tags": ["test"],
+ "aliases": ["测试别名"]
+ }, save=False)
+ results = qa.ask("测试问题", top_k=1)
+ assert results and results[0]["answer"] == "测试答案"
+
+def test_qa_topics(qa):
+ topics = qa.topics()
+ assert isinstance(topics, list)
diff --git a/linkis-mcp-server/test/test_linkis_client.py
b/linkis-mcp-server/test/test_linkis_client.py
new file mode 100644
index 0000000000..eca8246880
--- /dev/null
+++ b/linkis-mcp-server/test/test_linkis_client.py
@@ -0,0 +1,24 @@
+import pytest
+from config import LINKIS_BASE_URL, LINKIS_TOKEN
+from linkis_client import LinkisClient, LinkisError
+
+
[email protected](scope="module")
+def client():
+ return LinkisClient(base_url=LINKIS_BASE_URL, token=LINKIS_TOKEN)
+
+
+def test_client_init(client):
+ assert client.base_url == LINKIS_BASE_URL
+ assert client.token == LINKIS_TOKEN
+ assert hasattr(client, "_session")
+
+
+def test_client_get_fail(client):
+ with pytest.raises(LinkisError):
+ client.get("/api/not_exist_path")
+
+
+def test_client_post_fail(client):
+ with pytest.raises(LinkisError):
+ client.post("/api/not_exist_path", json_body={})
diff --git a/linkis-mcp-server/test/test_mcp_tools.py
b/linkis-mcp-server/test/test_mcp_tools.py
new file mode 100644
index 0000000000..2a0c4b86bf
--- /dev/null
+++ b/linkis-mcp-server/test/test_mcp_tools.py
@@ -0,0 +1,14 @@
+import pytest
+import mcp_server
+
+def test_mcp_tools_exist():
+ tools = mcp_server.mcp.list_tools()
+ expected_tools = [
+ "login", "get_databases", "get_tables", "get_columns",
"get_partitions",
+ "get_table_props", "get_partition_props", "displaysql",
+ "get_table_fields_info", "get_table_statistic_info", "execute",
"submit",
+ "status", "progress", "progress_with_resource", "pause", "kill",
"kill_jobs",
+ "runningtask", "taskinfo", "qa_ask", "qa_add", "qa_topics", "qa_reload"
+ ]
+ for tool in expected_tools:
+ assert any(t.name == tool for t in tools)
diff --git a/linkis-mcp-server/test/test_metadata_api.py
b/linkis-mcp-server/test/test_metadata_api.py
new file mode 100644
index 0000000000..451c0adf4f
--- /dev/null
+++ b/linkis-mcp-server/test/test_metadata_api.py
@@ -0,0 +1,51 @@
+import pytest
+from config import LINKIS_BASE_URL, LINKIS_TOKEN
+from linkis_client import LinkisClient
+from metadata import MetadataAPI
+
[email protected](scope="module")
+def metadata_api():
+ client = LinkisClient(base_url=LINKIS_BASE_URL, token=LINKIS_TOKEN)
+ return MetadataAPI(client)
+
+def test_get_databases(metadata_api):
+ try:
+ dbs = metadata_api.get_databases()
+ assert isinstance(dbs, list)
+ except Exception as e:
+ pytest.skip(f"get_databases skipped: {e}")
+
+def test_get_tables(metadata_api):
+ try:
+ tables = metadata_api.get_tables("default")
+ assert isinstance(tables, list)
+ except Exception as e:
+ pytest.skip(f"get_tables skipped: {e}")
+
+def test_get_columns(metadata_api):
+ try:
+ cols = metadata_api.get_columns("default", "some_table")
+ assert isinstance(cols, list)
+ except Exception as e:
+ pytest.skip(f"get_columns skipped: {e}")
+
+def test_get_partitions(metadata_api):
+ try:
+ parts = metadata_api.get_partitions("default",
"some_partitioned_table")
+ assert isinstance(parts, list)
+ except Exception as e:
+ pytest.skip(f"get_partitions skipped: {e}")
+
+def test_get_table_props(metadata_api):
+ try:
+ props = metadata_api.get_table_props("default", "some_table")
+ assert isinstance(props, dict)
+ except Exception as e:
+ pytest.skip(f"get_table_props skipped: {e}")
+
+def test_get_partition_props(metadata_api):
+ try:
+ props = metadata_api.get_partition_props("default",
"some_partitioned_table", "part_col=1")
+ assert isinstance(props, dict)
+ except Exception as e:
+ pytest.skip(f"get_partition_props skipped: {e}")
diff --git a/linkis-mcp-server/test/test_tasks_api.py
b/linkis-mcp-server/test/test_tasks_api.py
new file mode 100644
index 0000000000..9f902c704b
--- /dev/null
+++ b/linkis-mcp-server/test/test_tasks_api.py
@@ -0,0 +1,69 @@
+import pytest
+from config import LINKIS_BASE_URL, LINKIS_TOKEN
+from linkis_client import LinkisClient
+from tasks import TaskAPI
+
[email protected](scope="module")
+def task_api():
+ client = LinkisClient(base_url=LINKIS_BASE_URL, token=LINKIS_TOKEN)
+ return TaskAPI(client)
+
+def test_execute(task_api):
+ try:
+ res = task_api.execute("SELECT 1", "test_user")
+ assert isinstance(res, dict)
+ except Exception as e:
+ pytest.skip(f"execute skipped: {e}")
+
+def test_submit_status_kill(task_api):
+ try:
+ sub = task_api.submit("SELECT 1", "test_user")
+ assert isinstance(sub, dict)
+ exec_id = sub.get("execID") or sub.get("execId")
+ if exec_id:
+ st = task_api.status(exec_id)
+ assert isinstance(st, dict)
+ task_api.kill(exec_id)
+ except Exception as e:
+ pytest.skip(f"submit/status/kill skipped: {e}")
+
+def test_progress_methods(task_api):
+ try:
+ sub = task_api.submit("SELECT 1", "test_user")
+ exec_id = sub.get("execID") or sub.get("execId")
+ if exec_id:
+ prog = task_api.progress(exec_id)
+ prog_res = task_api.progress_with_resource(exec_id)
+ assert isinstance(prog, dict)
+ assert isinstance(prog_res, dict)
+ except Exception as e:
+ pytest.skip(f"progress skipped: {e}")
+
+def test_pause(task_api):
+ try:
+ sub = task_api.submit("SELECT 1", "test_user")
+ exec_id = sub.get("execID") or sub.get("execId")
+ if exec_id:
+ res = task_api.pause(exec_id)
+ assert isinstance(res, dict)
+ except Exception as e:
+ pytest.skip(f"pause skipped: {e}")
+
+def test_kill_jobs(task_api):
+ try:
+ sub = task_api.submit("SELECT 1", "test_user")
+ exec_id = sub.get("execID") or sub.get("execId")
+ if exec_id:
+ res = task_api.kill_jobs(exec_id)
+ assert isinstance(res, dict)
+ except Exception as e:
+ pytest.skip(f"kill_jobs skipped: {e}")
+
+def test_runningtask_taskinfo(task_api):
+ try:
+ run_info = task_api.runningtask()
+ ti = task_api.taskinfo()
+ assert isinstance(run_info, dict)
+ assert isinstance(ti, dict)
+ except Exception as e:
+ pytest.skip(f"runningtask/taskinfo skipped: {e}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]