This is an automated email from the ASF dual-hosted git repository.
freeoneplus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-mcp-server.git
The following commit(s) were added to refs/heads/master by this push:
new c1ce9a5 [Config]Delete the minimum data pool variable (#11)
c1ce9a5 is described below
commit c1ce9a5cc7e1f73221bb9062069a8da5d4d8f4d9
Author: Yijia Su <[email protected]>
AuthorDate: Wed Jul 2 19:57:45 2025 +0800
[Config]Delete the minimum data pool variable (#11)
* fix at_eof bug
* update uv.lock
* fix bug and change pool min values
---
.env.example | 3 +-
doris_mcp_server/utils/config.py | 23 +++---
doris_mcp_server/utils/db.py | 7 +-
doris_mcp_server/utils/query_executor.py | 121 ++++++++++++++-----------------
4 files changed, 72 insertions(+), 82 deletions(-)
diff --git a/.env.example b/.env.example
index 0c632a1..03f9e47 100644
--- a/.env.example
+++ b/.env.example
@@ -28,7 +28,6 @@ DORIS_BE_WEBSERVER_PORT=8040
# Connection Pool Configuration
# =============================================================================
-DORIS_MIN_CONNECTIONS=5
DORIS_MAX_CONNECTIONS=20
DORIS_CONNECTION_TIMEOUT=30
DORIS_HEALTH_CHECK_INTERVAL=60
@@ -86,5 +85,5 @@ ALERT_WEBHOOK_URL=
# =============================================================================
SERVER_NAME=doris-mcp-server
-SERVER_VERSION=0.4.1
+SERVER_VERSION=0.4.2
SERVER_PORT=3000
diff --git a/doris_mcp_server/utils/config.py b/doris_mcp_server/utils/config.py
index 4f566b4..d48d118 100644
--- a/doris_mcp_server/utils/config.py
+++ b/doris_mcp_server/utils/config.py
@@ -53,12 +53,19 @@ class DatabaseConfig:
be_webserver_port: int = 8040
# Connection pool configuration
- min_connections: int = 5
+ # Note: min_connections is fixed at 0 to avoid at_eof connection issues
+ # This prevents pre-creation of connections which can cause state problems
+ _min_connections: int = field(default=0, init=False) # Internal use only,
always 0
max_connections: int = 20
connection_timeout: int = 30
health_check_interval: int = 60
max_connection_age: int = 3600
+ @property
+ def min_connections(self) -> int:
+ """Minimum connections is always 0 to prevent at_eof issues"""
+ return self._min_connections
+
@dataclass
class SecurityConfig:
@@ -248,9 +255,6 @@ class DorisConfig:
config.database.be_webserver_port =
int(os.getenv("DORIS_BE_WEBSERVER_PORT",
str(config.database.be_webserver_port)))
# Connection pool configuration
- config.database.min_connections = int(
- os.getenv("DORIS_MIN_CONNECTIONS",
str(config.database.min_connections))
- )
config.database.max_connections = int(
os.getenv("DORIS_MAX_CONNECTIONS",
str(config.database.max_connections))
)
@@ -414,7 +418,7 @@ class DorisConfig:
"fe_http_port": self.database.fe_http_port,
"be_hosts": self.database.be_hosts,
"be_webserver_port": self.database.be_webserver_port,
- "min_connections": self.database.min_connections,
+ "min_connections": self.database.min_connections, # Always 0,
shown for reference
"max_connections": self.database.max_connections,
"connection_timeout": self.database.connection_timeout,
"health_check_interval": self.database.health_check_interval,
@@ -492,11 +496,8 @@ class DorisConfig:
if not self.database.user:
errors.append("Database username cannot be empty")
- if self.database.min_connections <= 0:
- errors.append("Minimum connections must be greater than 0")
-
- if self.database.max_connections <= self.database.min_connections:
- errors.append("Maximum connections must be greater than minimum
connections")
+ if self.database.max_connections <= 0:
+ errors.append("Maximum connections must be greater than 0")
# Validate security configuration
if self.security.auth_type not in ["token", "basic", "oauth"]:
@@ -549,7 +550,7 @@ class DorisConfig:
return {
"server": f"{self.server_name} v{self.server_version}",
"database":
f"{self.database.host}:{self.database.port}/{self.database.database}",
- "connection_pool":
f"{self.database.min_connections}-{self.database.max_connections}",
+ "connection_pool": f"0-{self.database.max_connections} (min fixed
at 0 for stability)",
"security": {
"auth_type": self.security.auth_type,
"masking_enabled": self.security.enable_masking,
diff --git a/doris_mcp_server/utils/db.py b/doris_mcp_server/utils/db.py
index f6cc9f5..e04b4df 100644
--- a/doris_mcp_server/utils/db.py
+++ b/doris_mcp_server/utils/db.py
@@ -219,7 +219,8 @@ class DorisConnectionManager:
password=self.config.database.password,
db=self.config.database.database,
charset="utf8",
- minsize=0, # Avoid pre-creation issues - create connections
on demand
+ minsize=self.config.database.min_connections, # Always 0 per
configuration to avoid at_eof issues
+
maxsize=self.config.database.max_connections or 20,
autocommit=True,
connect_timeout=self.connection_timeout,
@@ -234,6 +235,8 @@ class DorisConnectionManager:
self.logger.info(
f"Connection pool initialized successfully with on-demand
connection creation, "
+ f"min connections: {self.config.database.min_connections}, "
+
f"max connections: {self.config.database.max_connections or
20}"
)
@@ -472,7 +475,7 @@ class DorisConnectionManager:
if conn.connection and not conn.connection.closed:
await conn.connection.ensure_closed()
except Exception:
- pass
+ pass # Ignore errors during forced close
# Close connection wrapper
await conn.close()
diff --git a/doris_mcp_server/utils/query_executor.py
b/doris_mcp_server/utils/query_executor.py
index e7b736b..367dd9c 100644
--- a/doris_mcp_server/utils/query_executor.py
+++ b/doris_mcp_server/utils/query_executor.py
@@ -585,63 +585,57 @@ class DorisQueryExecutor:
timeout=timeout,
cache_enabled=False # Disable cache for MCP calls to
ensure fresh data
)
-
+
# Execute query with retry logic
- try:
- result = await self.execute_query(query_request,
auth_context)
-
- # Serialize data for JSON response
- serialized_data = []
- for row in result.data:
- serialized_data.append(self._serialize_row_data(row))
-
- return {
- "success": True,
- "data": serialized_data,
- "row_count": result.row_count,
- "execution_time": result.execution_time,
- "metadata": {
- "columns": result.metadata.get("columns", []),
- "query": sql
- }
+ result = await self.execute_query(query_request, auth_context)
+
+ # Serialize data for JSON response
+ serialized_data = []
+ for row in result.data:
+ serialized_data.append(self._serialize_row_data(row))
+
+ return {
+ "success": True,
+ "data": serialized_data,
+ "row_count": result.row_count,
+ "execution_time": result.execution_time,
+ "metadata": {
+ "columns": result.metadata.get("columns", []),
+ "query": sql
}
-
- except Exception as query_error:
- # Check if it's a connection-related error that we should
retry
- error_str = str(query_error).lower()
- connection_errors = [
- "at_eof", "connection", "closed", "nonetype",
- "transport", "reader", "broken pipe", "connection
reset"
- ]
-
- is_connection_error = any(err in error_str for err in
connection_errors)
-
- if is_connection_error and retry_count < max_retries:
- retry_count += 1
- self.logger.warning(f"Connection error detected,
retrying ({retry_count}/{max_retries}): {query_error}")
-
- # Release the problematic connection
- try:
- await
self.connection_manager.release_connection(session_id)
- except Exception:
- pass # Ignore cleanup errors
-
- # Wait a bit before retry
- await asyncio.sleep(0.5 * retry_count)
- continue
- else:
- # Re-raise if not a connection error or max retries
exceeded
- raise query_error
-
+ }
+
except Exception as e:
error_msg = str(e)
+ error_str = error_msg.lower()
+
+ # Check if it's a connection-related error that we should retry
+ connection_errors = [
+ "at_eof", "connection", "closed", "nonetype",
+ "transport", "reader", "broken pipe", "connection reset"
+ ]
- # If we've exhausted retries or it's not a connection error,
return error
- if retry_count >= max_retries or "at_eof" not in
error_msg.lower():
+ is_connection_error = any(err in error_str for err in
connection_errors)
+
+ if is_connection_error and retry_count < max_retries:
+ retry_count += 1
+ self.logger.warning(f"Connection error detected, retrying
({retry_count}/{max_retries}): {e}")
+
+ # Release the problematic connection
+ try:
+ await
self.connection_manager.release_connection(session_id)
+ except Exception:
+ pass # Ignore cleanup errors
+
+ # Wait a bit before retry
+ await asyncio.sleep(0.5 * retry_count)
+ continue
+ else:
+ # If we've exhausted retries or it's not a connection
error, return error
error_analysis = self._analyze_error(error_msg)
return {
- "success": False,
+ "success": False,
"error": error_analysis.get("user_message", error_msg),
"error_type": error_analysis.get("error_type",
"general_error"),
"data": None,
@@ -651,24 +645,17 @@ class DorisQueryExecutor:
"retry_count": retry_count
}
}
- else:
- # Try one more time for connection errors
- retry_count += 1
- if retry_count <= max_retries:
- self.logger.warning(f"Retrying query due to connection
error ({retry_count}/{max_retries}): {e}")
- await asyncio.sleep(0.5 * retry_count)
- continue
- else:
- return {
- "success": False,
- "error": f"Query failed after {max_retries}
retries: {error_msg}",
- "data": None,
- "metadata": {
- "query": sql,
- "error_details": error_msg,
- "retry_count": retry_count
- }
- }
+
+ # This should never be reached, but just in case
+ return {
+ "success": False,
+ "error": "Maximum retries exceeded",
+ "data": None,
+ "metadata": {
+ "query": sql,
+ "retry_count": retry_count
+ }
+ }
def _serialize_row_data(self, row_data: Dict[str, Any]) -> Dict[str, Any]:
"""Serialize row data for JSON response"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]