This is an automated email from the ASF dual-hosted git repository.
critas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-mcp-server.git
The following commit(s) were added to refs/heads/main by this push:
new af0ee71 feat: support for tree model (#3)
af0ee71 is described below
commit af0ee7147d0b85321258f4960e1cedf264eefc5f
Author: xiangmy21 <[email protected]>
AuthorDate: Tue Apr 15 11:47:16 2025 +0800
feat: support for tree model (#3)
* feat: server for tree model
* fix: timestamp
* feat: support for tree model
* fix: readme
* chore
---
.gitignore | 2 +
README.md | 25 +++-
src/iotdb_mcp_server/config.py | 31 +++--
src/iotdb_mcp_server/server.py | 284 +++++++++++++++++++++++++++++++----------
4 files changed, 259 insertions(+), 83 deletions(-)
diff --git a/.gitignore b/.gitignore
index d53d616..ec15ad1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,6 +18,8 @@ uv.lock
# IDE settings (optional)
.vscode/
.idea/
+.specstory/
+.cursor/
# Distribution directories
*.dist-info/
diff --git a/README.md b/README.md
index 0033771..457021f 100644
--- a/README.md
+++ b/README.md
@@ -14,17 +14,30 @@ The server doesn't expose any resources.
The server doesn't provide any prompts.
### Tools
-The server offers three core tools:
+The server offers different tools for IoTDB Tree Model and Table Model. You
can choose between them by setting the "IOTDB_SQL_DIALECT" configuration to
either "tree" or "table".
-#### Query Tools
+#### Tree Model
+- `metadata_query`
+ - Execute SHOW/COUNT queries to read metadata from the database
+ - Input:
+ - `query_sql` (string): The SHOW/COUNT SQL query to execute
+ - Returns: Query results as array of objects
+- `select_query`
+ - Execute SELECT queries to read data from the database
+ - Input:
+ - `query_sql` (string): The SELECT SQL query to execute
+ - Returns: Query results as array of objects
+
+#### Table Model
+
+##### Query Tools
- `read_query`
- Execute SELECT queries to read data from the database
- Input:
- `query` (string): The SELECT SQL query to execute
- Returns: Query results as array of objects
-
-#### Schema Tools
+##### Schema Tools
- `list_tables`
- Get a list of all tables in the database
- No input required
@@ -37,7 +50,6 @@ The server offers three core tools:
- Returns: Array of column definitions with names and types
-
## Claude Desktop Integration
## Prerequisites
@@ -90,7 +102,8 @@ Location: `%APPDATA%/Claude/claude_desktop_config.json`
"IOTDB_PORT": "6667",
"IOTDB_USER": "root",
"IOTDB_PASSWORD": "root",
- "IOTDB_DATABASE": "test"
+ "IOTDB_DATABASE": "test",
+ "IOTDB_SQL_DIALECT": "table"
}
}
}
diff --git a/src/iotdb_mcp_server/config.py b/src/iotdb_mcp_server/config.py
index db6875a..c571e2b 100644
--- a/src/iotdb_mcp_server/config.py
+++ b/src/iotdb_mcp_server/config.py
@@ -49,7 +49,12 @@ class Config:
database: str
"""
- IoTDB database name
+ IoTDB database
+ """
+
+ sql_dialect: str
+ """
+ SQL dialect: tree or table
"""
@staticmethod
@@ -73,13 +78,6 @@ class Config:
default=os.getenv("IOTDB_PORT", 6667),
)
- parser.add_argument(
- "--database",
- type=str,
- help="IoTDB connect database name",
- default=os.getenv("IOTDB_DATABASE", "test"),
- )
-
parser.add_argument(
"--user",
type=str,
@@ -94,11 +92,26 @@ class Config:
default=os.getenv("IOTDB_PASSWORD", "root"),
)
+ parser.add_argument(
+ "--database",
+ type=str,
+ help="IoTDB connect database name",
+ default=os.getenv("IOTDB_DATABASE", "test"),
+ )
+
+ parser.add_argument(
+ "--sql-dialect",
+ type=str,
+ help="SQL dialect: tree or table",
+ default=os.getenv("IOTDB_SQL_DIALECT", "table"),
+ )
+
args = parser.parse_args()
return Config(
host=args.host,
port=args.port,
- database=args.database,
user=args.user,
password=args.password,
+ database=args.database,
+ sql_dialect=args.sql_dialect,
)
diff --git a/src/iotdb_mcp_server/server.py b/src/iotdb_mcp_server/server.py
index 0527bc2..7e4b9ff 100644
--- a/src/iotdb_mcp_server/server.py
+++ b/src/iotdb_mcp_server/server.py
@@ -17,10 +17,13 @@
#
import logging
+import datetime
+from iotdb.Session import Session
+from iotdb.SessionPool import SessionPool, PoolConfig
+from iotdb.utils.SessionDataSet import SessionDataSet
from iotdb.table_session import TableSession
from iotdb.table_session_pool import TableSessionPool, TableSessionPoolConfig
-from iotdb.utils.SessionDataSet import SessionDataSet
from mcp.server.fastmcp import FastMCP
from mcp.types import (
TextContent,
@@ -46,85 +49,230 @@ db_config = {
"user": config.user,
"password": config.password,
"database": config.database,
+ "sql_dialect": config.sql_dialect,
}
logger.info(f"IoTDB Config: {db_config}")
-session_pool_config = TableSessionPoolConfig(
- node_urls=[str(config.host) + ":" + str(config.port)],
- username=config.user,
- password=config.password,
- database=None if len(config.database) == 0 else config.database,
-)
+if config.sql_dialect == "tree":
+
+ pool_config = PoolConfig(
+ node_urls=[str(config.host) + ":" + str(config.port)],
+ user_name=config.user,
+ password=config.password,
+ fetch_size=1024,
+ time_zone="UTC+8",
+ max_retry=3
+ )
+ max_pool_size = 5
+ wait_timeout_in_ms = 3000
+ session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms)
+
+ @mcp.tool()
+ async def metadata_query(query_sql: str) -> list[TextContent]:
+ """Execute metadata queries on IoTDB to explore database structure and
statistics.
+
+ Args:
+ query_sql: The metadata query to execute. Supported queries:
+ - SHOW DATABASES [path]: List all databases or databases under
a specific path
+ - SHOW TIMESERIES [path]: List all time series or time series
under a specific path
+ - SHOW CHILD PATHS [path]: List child paths under a specific
path
+ - SHOW CHILD NODES [path]: List child nodes under a specific
path
+ - SHOW DEVICES [path]: List all devices or devices under a
specific path
+ - COUNT TIMESERIES [path]: Count time series under a specific
path
+ - COUNT NODES [path]: Count nodes under a specific path
+ - COUNT DEVICES [path]: Count devices under a specific path
+ - if path is not provided, the query will be applied to root.**
+
+ Examples:
+ SHOW DATABASES root.**
+ SHOW TIMESERIES root.ln.**
+ SHOW CHILD PATHS root.ln
+ SHOW CHILD PATHS root.ln.*.*
+ SHOW CHILD NODES root.ln
+ SHOW DEVICES root.ln.**
+ COUNT TIMESERIES root.ln.**
+ COUNT NODES root.ln
+ COUNT DEVICES root.ln
+ """
+ session = session_pool.get_session()
+ try:
+ stmt = query_sql.strip().upper()
+
+ # 处理SHOW DATABASES
+ if (
+ stmt.startswith("SHOW DATABASES")
+ or stmt.startswith("SHOW TIMESERIES")
+ or stmt.startswith("SHOW CHILD PATHS")
+ or stmt.startswith("SHOW CHILD NODES")
+ or stmt.startswith("SHOW DEVICES")
+ or stmt.startswith("COUNT TIMESERIES")
+ or stmt.startswith("COUNT NODES")
+ or stmt.startswith("COUNT DEVICES")
+ ):
+ res = session.execute_query_statement(query_sql)
+ return prepare_res(res, session)
+ else:
+ raise ValueError("Unsupported metadata query. Please use one
of the supported query types.")
+
+ except Exception as e:
+ session.close()
+ raise e
+
+ @mcp.tool()
+ async def select_query(query_sql: str) -> list[TextContent]:
+ """Execute a SELECT query on the IoTDB tree SQL dialect.
+
+ Args:
+ query_sql: The SQL query to execute (using TREE dialect)
+
+ SQL Syntax:
+ SELECT [LAST] selectExpr [, selectExpr] ...
+ [INTO intoItem [, intoItem] ...]
+ FROM prefixPath [, prefixPath] ...
+ [WHERE whereCondition]
+ [GROUP BY {
+ ([startTime, endTime), interval [, slidingStep]) |
+ LEVEL = levelNum [, levelNum] ... |
+ TAGS(tagKey [, tagKey] ... |
+ VARIATION(expression[,delta][,ignoreNull=true/false]) |
+
CONDITION(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false]) |
+ SESSION(timeInterval) |
+ COUNT(expression, size[,ignoreNull=true/false])
+ }]
+ [HAVING havingCondition]
+ [ORDER BY sortKey {ASC | DESC}]
+ [FILL ({PREVIOUS | LINEAR | constant}) (,
interval=DURATION_LITERAL)?)]
+ [SLIMIT seriesLimit] [SOFFSET seriesOffset]
+ [LIMIT rowLimit] [OFFSET rowOffset]
+ [ALIGN BY {TIME | DEVICE}]
+
+ Examples:
+ select temperature from root.ln.wf01.wt01 where time <
2017-11-01T00:08:00.000
+ select status, temperature from root.ln.wf01.wt01 where (time >
2017-11-01T00:05:00.000 and time < 2017-11-01T00:12:00.000) or (time >=
2017-11-01T16:35:00.000 and time <= 2017-11-01T16:37:00.000)
+ select * from root.ln.** where time > 1 order by time desc limit
10;
-session_pool = TableSessionPool(session_pool_config)
+ Supported Aggregate Functions:
+ SUM
+ COUNT
+ MAX_VALUE
+ MIN_VALUE
+ AVG
+ VARIANCE
+ MAX_TIME
+ MIN_TIME
+ ...
+ """
+ session = session_pool.get_session()
+ res = session.execute_query_statement(query_sql)
+ stmt = query_sql.strip().upper()
+ # Regular SELECT queries
+ if (
+ stmt.startswith("SELECT")
+ ):
+ return prepare_res(res, session)
+ # Non-SELECT queries
+ else:
+ raise ValueError("Only SELECT queries are allowed for read_query")
[email protected]()
-async def read_query(query_sql: str) -> list[TextContent]:
- """Execute a SELECT query on the IoTDB. Please use table sql_dialect when
generating SQL queries.
+ def prepare_res(
+ _res: SessionDataSet, _session: Session
+ ) -> list[TextContent]:
+ columns = _res.get_column_names()
+ result = []
+ while _res.has_next():
+ record = _res.next()
+ if columns[0] == "Time":
+ timestamp = record.get_timestamp()
+ # formatted_time =
datetime.datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
+ row = record.get_fields()
+ result.append(timestamp + "," + ",".join(map(str, row)))
+ else:
+ row = record.get_fields()
+ result.append(",".join(map(str, row)))
+ _session.close()
+ return [
+ TextContent(
+ type="text",
+ text="\n".join([",".join(columns)] + result),
+ )
+ ]
- Args:
- query_sql: The SQL query to execute (using TABLE dialect)
- """
- table_session = session_pool.get_session()
- res = table_session.execute_query_statement(query_sql)
+elif config.sql_dialect == "table":
+
+ session_pool_config = TableSessionPoolConfig(
+ node_urls=[str(config.host) + ":" + str(config.port)],
+ username=config.user,
+ password=config.password,
+ database=None if len(config.database) == 0 else config.database,
+ )
+ session_pool = TableSessionPool(session_pool_config)
+
+ @mcp.tool()
+ async def read_query(query_sql: str) -> list[TextContent]:
+ """Execute a SELECT query on the IoTDB. Please use table sql_dialect
when generating SQL queries.
+
+ Args:
+ query_sql: The SQL query to execute (using TABLE dialect)
+ """
+ table_session = session_pool.get_session()
+ res = table_session.execute_query_statement(query_sql)
+
+ stmt = query_sql.strip().upper()
+ # Regular SELECT queries
+ if (
+ stmt.startswith("SELECT")
+ or stmt.startswith("DESCRIBE")
+ or stmt.startswith("SHOW")
+ ):
+ return prepare_res(res, table_session)
+ # Non-SELECT queries
+ else:
+ raise ValueError("Only SELECT queries are allowed for read_query")
+
+
+ @mcp.tool()
+ async def list_tables() -> list[TextContent]:
+ """List all tables in the IoTDB database."""
+ table_session = session_pool.get_session()
+ res = table_session.execute_query_statement("SHOW TABLES")
+
+ result = ["Tables_in_" + db_config["database"]] # Header
+ while res.has_next():
+ result.append(str(res.next().get_fields()[0]))
+ table_session.close()
+ return [TextContent(type="text", text="\n".join(result))]
+
+
+ @mcp.tool()
+ async def describe_table(table_name: str) -> list[TextContent]:
+ """Get the schema information for a specific table
+ Args:
+ table_name: name of the table to describe
+ """
+ table_session = session_pool.get_session()
+ res = table_session.execute_query_statement("DESC " + table_name)
- stmt = query_sql.strip().upper()
- # Regular SELECT queries
- if (
- stmt.startswith("SELECT")
- or stmt.startswith("DESCRIBE")
- or stmt.startswith("SHOW")
- ):
return prepare_res(res, table_session)
- # Non-SELECT queries
- else:
- raise ValueError("Only SELECT queries are allowed for read_query")
-
-
[email protected]()
-async def list_tables() -> list[TextContent]:
- """List all tables in the IoTDB database."""
- table_session = session_pool.get_session()
- res = table_session.execute_query_statement("SHOW TABLES")
-
- result = ["Tables_in_" + db_config["database"]] # Header
- while res.has_next():
- result.append(str(res.next().get_fields()[0]))
- table_session.close()
- return [TextContent(type="text", text="\n".join(result))]
-
-
[email protected]()
-async def describe_table(table_name: str) -> list[TextContent]:
- """Get the schema information for a specific table
- Args:
- table_name: name of the table to describe
- """
- table_session = session_pool.get_session()
- res = table_session.execute_query_statement("DESC " + table_name)
-
- return prepare_res(res, table_session)
-
-
-def prepare_res(
- _res: SessionDataSet, _table_session: TableSession
-) -> list[TextContent]:
- columns = _res.get_column_names()
- result = []
- while _res.has_next():
- row = _res.next().get_fields()
- result.append(",".join(map(str, row)))
- _table_session.close()
- return [
- TextContent(
- type="text",
- text="\n".join([",".join(columns)] + result),
- )
- ]
+ def prepare_res(
+ _res: SessionDataSet, _table_session: TableSession
+ ) -> list[TextContent]:
+ columns = _res.get_column_names()
+ result = []
+ while _res.has_next():
+ row = _res.next().get_fields()
+ result.append(",".join(map(str, row)))
+ _table_session.close()
+ return [
+ TextContent(
+ type="text",
+ text="\n".join([",".join(columns)] + result),
+ )
+ ]
+
if __name__ == "__main__":
logger.info("iotdb_mcp_server running with stdio transport")
# Initialize and run the server