This is an automated email from the ASF dual-hosted git repository.
freeoneplus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-mcp-server.git
The following commit(s) were added to refs/heads/master by this push:
new 4052b7e [BUG]Completely solve the at_eof problem (#20)
4052b7e is described below
commit 4052b7e9389ebfe8b6441ae2d46613c16c1d7757
Author: Yijia Su <[email protected]>
AuthorDate: Thu Jul 10 13:08:32 2025 +0800
[BUG]Completely solve the at_eof problem (#20)
* fix at_eof bug
* update uv.lock
* fix bug and change pool min values
* Fixed startup errors caused by multiple versions of MCP services
* fix connection bug
---
doris_mcp_server/utils/db.py | 971 ++++++++++++++---------------
doris_mcp_server/utils/schema_extractor.py | 50 +-
2 files changed, 480 insertions(+), 541 deletions(-)
diff --git a/doris_mcp_server/utils/db.py b/doris_mcp_server/utils/db.py
index e04b4df..08ed7db 100644
--- a/doris_mcp_server/utils/db.py
+++ b/doris_mcp_server/utils/db.py
@@ -29,6 +29,7 @@ from contextlib import asynccontextmanager
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List
+import random
import aiomysql
from aiomysql import Connection, Pool
@@ -70,6 +71,7 @@ class DorisConnection:
self.query_count = 0
self.is_healthy = True
self.security_manager = security_manager
+ self.logger = logging.getLogger(__name__)
async def execute(self, sql: str, params: tuple | None = None,
auth_context=None) -> QueryResult:
"""Execute SQL query"""
@@ -135,31 +137,46 @@ class DorisConnection:
raise
async def ping(self) -> bool:
- """Check connection health status"""
+ """Check connection health status with enhanced at_eof error
detection"""
try:
- # Check if connection exists and is not closed
+ # Check 1: Connection exists and is not closed
if not self.connection or self.connection.closed:
self.is_healthy = False
return False
- # Check if connection has _reader (aiomysql internal state)
- # This prevents the 'NoneType' object has no attribute 'at_eof'
error
- if not hasattr(self.connection, '_reader') or
self.connection._reader is None:
+ # Check 2: Use ONLY safe operations - avoid internal state access
+ # Instead of checking _reader state directly, use a simple query
test
+ try:
+ # Use a simple query with timeout instead of ping() to avoid
at_eof issues
+ async with asyncio.timeout(3): # 3 second timeout
+ async with self.connection.cursor() as cursor:
+ await cursor.execute("SELECT 1")
+ result = await cursor.fetchone()
+ if result and result[0] == 1:
+ self.is_healthy = True
+ return True
+ else:
+ self.logger.debug(f"Connection {self.session_id}
ping query returned unexpected result")
+ self.is_healthy = False
+ return False
+
+ except asyncio.TimeoutError:
+ self.logger.debug(f"Connection {self.session_id} ping timed
out")
self.is_healthy = False
return False
-
- # Additional check for reader's state
- if hasattr(self.connection._reader, '_transport') and
self.connection._reader._transport is None:
+ except Exception as query_error:
+ # Check for specific at_eof related errors
+ error_str = str(query_error).lower()
+ if 'at_eof' in error_str or 'nonetype' in error_str:
+ self.logger.debug(f"Connection {self.session_id} ping
failed with at_eof error: {query_error}")
+ else:
+ self.logger.debug(f"Connection {self.session_id} ping
failed: {query_error}")
self.is_healthy = False
return False
-
- # Try to ping the connection
- await self.connection.ping()
- self.is_healthy = True
- return True
- except (AttributeError, OSError, ConnectionError, Exception) as e:
- # Log the specific error for debugging
- logging.debug(f"Connection ping failed for session
{self.session_id}: {e}")
+
+ except Exception as e:
+ # Catch any other unexpected errors
+ self.logger.debug(f"Connection {self.session_id} ping failed with
unexpected error: {e}")
self.is_healthy = False
return False
@@ -173,482 +190,410 @@ class DorisConnection:
class DorisConnectionManager:
- """Doris database connection manager
+ """Doris database connection manager - Simplified Strategy
- Provides connection pool management, connection health monitoring, fault
recovery and other functions
- Supports session-level connection reuse and intelligent load balancing
- Integrates security manager to provide unified security validation and
data masking
+ Uses direct connection pool management without session-level caching
+ Implements connection pool health monitoring and proactive cleanup
"""
def __init__(self, config, security_manager=None):
self.config = config
+ self.security_manager = security_manager
self.pool: Pool | None = None
- self.session_connections: dict[str, DorisConnection] = {}
- self.metrics = ConnectionMetrics()
self.logger = logging.getLogger(__name__)
- self.security_manager = security_manager
-
- # Health check configuration
- self.health_check_interval = config.database.health_check_interval or
60
- self.max_connection_age = config.database.max_connection_age or 3600
- self.connection_timeout = config.database.connection_timeout or 30
-
- # Start background tasks
- self._health_check_task = None
- self._cleanup_task = None
+ self.metrics = ConnectionMetrics()
+
+ # Remove session-level connection management
+ # self.session_connections = {} # REMOVED
+
+ # Pool health monitoring
+ self.health_check_interval = 30 # seconds
+ self.pool_warmup_size = 3 # connections to maintain
+ self.pool_health_check_task = None
+ self.pool_cleanup_task = None
+
+ # Pool recovery lock to prevent race conditions
+ self.pool_recovery_lock = asyncio.Lock()
+ self.pool_recovering = False
+
+ # Database connection parameters from config.database
+ self.host = config.database.host
+ self.port = config.database.port
+ self.user = config.database.user
+ self.password = config.database.password
+ self.database = config.database.database
+ # Convert charset to aiomysql compatible format
+ charset_map = {"UTF8": "utf8", "UTF8MB4": "utf8mb4"}
+ self.charset = charset_map.get(config.database.charset.upper(),
config.database.charset.lower())
+ self.connect_timeout = config.database.connection_timeout
+
+ # Connection pool parameters - more conservative settings
+ self.minsize = config.database.min_connections # This is always 0
+ self.maxsize = config.database.max_connections or 10
+ self.pool_recycle = config.database.max_connection_age or 3600 # 1
hour, more conservative
async def initialize(self):
- """Initialize connection manager"""
+ """Initialize connection pool with health monitoring"""
try:
- self.logger.info(f"Initializing connection pool to
{self.config.database.host}:{self.config.database.port}")
-
- # Validate configuration
- if not self.config.database.host:
- raise ValueError("Database host is required")
- if not self.config.database.user:
- raise ValueError("Database user is required")
- if not self.config.database.password:
- self.logger.warning("Database password is empty, this may
cause connection issues")
-
- # Create connection pool with improved stability parameters
- # Key change: Set minsize=0 to avoid pre-creation issues that
cause at_eof errors
+ self.logger.info(f"Initializing connection pool to
{self.host}:{self.port}")
+
+ # Create connection pool
self.pool = await aiomysql.create_pool(
- host=self.config.database.host,
- port=self.config.database.port,
- user=self.config.database.user,
- password=self.config.database.password,
- db=self.config.database.database,
- charset="utf8",
- minsize=self.config.database.min_connections, # Always 0 per
configuration to avoid at_eof issues
-
- maxsize=self.config.database.max_connections or 20,
- autocommit=True,
- connect_timeout=self.connection_timeout,
- # Enhanced stability parameters
- pool_recycle=7200, # Recycle connections every 2 hours
- echo=False, # Don't echo SQL statements
- )
-
- # Test the connection pool with a more robust test
- if not await self._robust_connection_test():
- raise RuntimeError("Connection pool robust test failed")
-
- self.logger.info(
- f"Connection pool initialized successfully with on-demand
connection creation, "
- f"min connections: {self.config.database.min_connections}, "
-
- f"max connections: {self.config.database.max_connections or
20}"
+ host=self.host,
+ port=self.port,
+ user=self.user,
+ password=self.password,
+ db=self.database,
+ charset=self.charset,
+ minsize=self.minsize,
+ maxsize=self.maxsize,
+ pool_recycle=self.pool_recycle,
+ connect_timeout=self.connect_timeout,
+ autocommit=True
)
-
+
+ # Test initial connection
+ if not await self._test_pool_health():
+ raise RuntimeError("Connection pool health check failed")
+
# Start background monitoring tasks
- self._health_check_task =
asyncio.create_task(self._health_check_loop())
- self._cleanup_task = asyncio.create_task(self._cleanup_loop())
+ self.pool_health_check_task =
asyncio.create_task(self._pool_health_monitor())
+ self.pool_cleanup_task =
asyncio.create_task(self._pool_cleanup_monitor())
+
+ # Perform initial pool warmup
+ await self._warmup_pool()
+
+ self.logger.info(f"Connection pool initialized successfully, min
connections: {self.minsize}, max connections: {self.maxsize}")
+
+ except Exception as e:
+ self.logger.error(f"Failed to initialize connection pool: {e}")
+ raise
+ async def _test_pool_health(self) -> bool:
+ """Test connection pool health"""
+ try:
+ async with self.pool.acquire() as conn:
+ async with conn.cursor() as cursor:
+ await cursor.execute("SELECT 1")
+ result = await cursor.fetchone()
+ return result and result[0] == 1
except Exception as e:
- self.logger.error(f"Connection pool initialization failed: {e}")
- # Clean up partial initialization
- if self.pool:
+ self.logger.error(f"Pool health test failed: {e}")
+ return False
+
+ async def _warmup_pool(self):
+ """Warm up connection pool by creating initial connections"""
+ self.logger.info(f"๐ฅ Warming up connection pool with
{self.pool_warmup_size} connections")
+
+ warmup_connections = []
+ try:
+ # Acquire connections to force pool to create them
+ for i in range(self.pool_warmup_size):
try:
- self.pool.close()
- await self.pool.wait_closed()
+ conn = await self.pool.acquire()
+ warmup_connections.append(conn)
+ self.logger.debug(f"Warmed up connection
{i+1}/{self.pool_warmup_size}")
+ except Exception as e:
+ self.logger.warning(f"Failed to warm up connection {i+1}:
{e}")
+ break
+
+ # Release all warmup connections back to pool
+ for conn in warmup_connections:
+ try:
+ self.pool.release(conn)
+ except Exception as e:
+ self.logger.warning(f"Failed to release warmup connection:
{e}")
+
+ self.logger.info(f"โ
Pool warmup completed,
{len(warmup_connections)} connections created")
+
+ except Exception as e:
+ self.logger.error(f"Pool warmup failed: {e}")
+ # Clean up any remaining connections
+ for conn in warmup_connections:
+ try:
+ await conn.ensure_closed()
except Exception:
pass
- self.pool = None
- raise
- async def _robust_connection_test(self) -> bool:
- """Perform a robust connection test that validates full connection
health"""
- max_retries = 3
- for attempt in range(max_retries):
+ async def _pool_health_monitor(self):
+ """Background task to monitor pool health"""
+ self.logger.info("๐ฉบ Starting pool health monitor")
+
+ while True:
try:
- self.logger.debug(f"Testing connection pool (attempt {attempt
+ 1}/{max_retries})")
-
- # Test connection creation and validation
- test_conn = await self._create_raw_connection_with_validation()
- if test_conn:
- # Test basic query execution
- async with test_conn.cursor() as cursor:
- await cursor.execute("SELECT 1")
- result = await cursor.fetchone()
- if result and result[0] == 1:
- self.logger.debug("Connection pool test
successful")
- # Return connection to pool
- if self.pool:
- self.pool.release(test_conn)
- return True
- else:
- self.logger.warning("Connection test query
returned unexpected result")
-
- # Close test connection if we get here
- await test_conn.ensure_closed()
-
+ await asyncio.sleep(self.health_check_interval)
+ await self._check_pool_health()
+ except asyncio.CancelledError:
+ self.logger.info("Pool health monitor stopped")
+ break
except Exception as e:
- self.logger.warning(f"Connection test attempt {attempt + 1}
failed: {e}")
- if attempt == max_retries - 1:
- self.logger.error("All connection test attempts failed")
- return False
- else:
- # Wait before retry
- await asyncio.sleep(1.0 * (attempt + 1))
-
- return False
+ self.logger.error(f"Pool health monitor error: {e}")
- async def _create_raw_connection_with_validation(self, max_retries: int =
3):
- """Create a raw connection with comprehensive validation"""
- for attempt in range(max_retries):
+ async def _pool_cleanup_monitor(self):
+ """Background task to clean up stale connections"""
+ self.logger.info("๐งน Starting pool cleanup monitor")
+
+ while True:
try:
- if not self.pool:
- raise RuntimeError("Connection pool not initialized")
-
- # Acquire connection from pool
- raw_connection = await self.pool.acquire()
-
- # Basic connection validation
- if not raw_connection:
- self.logger.warning(f"Pool returned None connection
(attempt {attempt + 1})")
- continue
-
- if raw_connection.closed:
- self.logger.warning(f"Pool returned closed connection
(attempt {attempt + 1})")
- continue
-
- # Perform a simple ping test instead of checking internal state
- # Internal state (_reader, _transport) might not be fully
initialized yet
- try:
- # Test basic connectivity with a simple query
- async with raw_connection.cursor() as cursor:
- await cursor.execute("SELECT 1")
- result = await cursor.fetchone()
- if result and result[0] == 1:
- self.logger.debug(f"Successfully created and
validated raw connection (attempt {attempt + 1})")
- return raw_connection
- else:
- self.logger.warning(f"Connection test query failed
(attempt {attempt + 1})")
- await raw_connection.ensure_closed()
- continue
-
- except Exception as e:
- # Check if this is an at_eof error specifically
- error_str = str(e).lower()
- if 'at_eof' in error_str or 'nonetype' in error_str:
- self.logger.warning(f"Connection has at_eof issue
(attempt {attempt + 1}): {e}")
- else:
- self.logger.warning(f"Connection test failed (attempt
{attempt + 1}): {e}")
-
- try:
- await raw_connection.ensure_closed()
- except Exception:
- pass
- continue
-
+ await asyncio.sleep(self.health_check_interval * 2) # Less
frequent cleanup
+ await self._cleanup_stale_connections()
+ except asyncio.CancelledError:
+ self.logger.info("Pool cleanup monitor stopped")
+ break
except Exception as e:
- self.logger.warning(f"Raw connection creation attempt {attempt
+ 1} failed: {e}")
- if attempt == max_retries - 1:
- raise RuntimeError(f"Failed to create valid connection
after {max_retries} attempts: {e}")
- else:
- # Exponential backoff
- await asyncio.sleep(0.5 * (2 ** attempt))
-
- raise RuntimeError("Failed to create valid connection")
-
- async def get_connection(self, session_id: str) -> DorisConnection:
- """Get database connection with enhanced reliability
-
- Supports session-level connection reuse to improve performance and
consistency
- """
- # Check if there's an existing session connection
- if session_id in self.session_connections:
- conn = self.session_connections[session_id]
- # Enhanced connection health check
- if await self._comprehensive_connection_health_check(conn):
- return conn
- else:
- # Connection is unhealthy, clean up and create new one
- self.logger.debug(f"Existing connection unhealthy for session
{session_id}, creating new one")
- await self._cleanup_session_connection(session_id)
-
- # Create new connection with retry logic
- return await self._create_new_connection_with_retry(session_id)
+ self.logger.error(f"Pool cleanup monitor error: {e}")
- async def _comprehensive_connection_health_check(self, conn:
DorisConnection) -> bool:
- """Perform comprehensive connection health check"""
+ async def _check_pool_health(self):
+ """Check and maintain pool health"""
try:
- # Check basic connection state
- if not conn.connection or conn.connection.closed:
- return False
-
- # Instead of checking internal state, perform a simple ping test
- # This is more reliable and less dependent on aiomysql internals
- if not await conn.ping():
- return False
-
- return True
+ # Skip health check if already recovering
+ if self.pool_recovering:
+ self.logger.debug("Pool recovery in progress, skipping health
check")
+ return
+
+ # Test pool with a simple query
+ health_ok = await self._test_pool_health()
- except Exception as e:
- # Check for at_eof errors specifically
- error_str = str(e).lower()
- if 'at_eof' in error_str:
- self.logger.debug(f"Connection health check failed with at_eof
error: {e}")
+ if health_ok:
+ self.logger.debug("โ
Pool health check passed")
+ self.metrics.last_health_check = datetime.utcnow()
else:
- self.logger.debug(f"Connection health check failed: {e}")
- return False
-
- async def _create_new_connection_with_retry(self, session_id: str,
max_retries: int = 3) -> DorisConnection:
- """Create new database connection with retry logic"""
- for attempt in range(max_retries):
- try:
- # Get validated raw connection
- raw_connection = await
self._create_raw_connection_with_validation()
+ self.logger.warning("โ Pool health check failed, attempting
recovery")
+ await self._recover_pool()
- # Create wrapped connection
- doris_conn = DorisConnection(raw_connection, session_id,
self.security_manager)
-
- # Comprehensive connection test
- if await
self._comprehensive_connection_health_check(doris_conn):
- # Store in session connections
- self.session_connections[session_id] = doris_conn
- self.metrics.total_connections += 1
- self.logger.debug(f"Successfully created new connection
for session: {session_id}")
- return doris_conn
- else:
- # Connection failed health check, clean up and retry
- self.logger.warning(f"New connection failed health check
for session {session_id} (attempt {attempt + 1})")
- try:
- await doris_conn.close()
- except Exception:
- pass
-
- except Exception as e:
- self.logger.warning(f"Connection creation attempt {attempt +
1} failed for session {session_id}: {e}")
- if attempt == max_retries - 1:
- self.metrics.connection_errors += 1
- raise RuntimeError(f"Failed to create connection for
session {session_id} after {max_retries} attempts: {e}")
- else:
- # Exponential backoff
- await asyncio.sleep(0.5 * (2 ** attempt))
-
- raise RuntimeError(f"Unexpected failure in connection creation for
session {session_id}")
-
- async def release_connection(self, session_id: str):
- """Release session connection"""
- if session_id in self.session_connections:
- await self._cleanup_session_connection(session_id)
+ except Exception as e:
+ self.logger.error(f"Pool health check error: {e}")
+ await self._recover_pool()
- async def _cleanup_session_connection(self, session_id: str):
- """Clean up session connection with enhanced safety"""
- if session_id in self.session_connections:
- conn = self.session_connections[session_id]
- try:
- # Simplified connection validation before returning to pool
- connection_healthy = False
+ async def _cleanup_stale_connections(self):
+ """Proactively clean up potentially stale connections"""
+ try:
+ self.logger.debug("๐งน Checking for stale connections")
+
+ # Get pool statistics
+ pool_size = self.pool.size
+ pool_free = self.pool.freesize
+
+ # If pool has idle connections, test some of them
+ if pool_free > 0:
+ test_count = min(pool_free, 2) # Test up to 2 idle connections
- if (self.pool and
- conn.connection and
- not conn.connection.closed):
-
- # Test if connection is still healthy with a simple check
+ for i in range(test_count):
try:
- # Quick ping test to see if connection is usable
- async with conn.connection.cursor() as cursor:
- await cursor.execute("SELECT 1")
+ # Acquire connection, test it, and release
+ conn = await asyncio.wait_for(self.pool.acquire(),
timeout=5)
+
+ # Quick test
+ async with conn.cursor() as cursor:
+ await asyncio.wait_for(cursor.execute("SELECT 1"),
timeout=3)
await cursor.fetchone()
- connection_healthy = True
- except Exception as test_error:
- self.logger.debug(f"Connection health test failed for
session {session_id}: {test_error}")
- connection_healthy = False
-
- if connection_healthy:
- # Connection appears healthy, return to pool
- try:
- self.pool.release(conn.connection)
- self.logger.debug(f"Successfully returned connection
to pool for session {session_id}")
- except Exception as pool_error:
- self.logger.debug(f"Failed to return connection to
pool for session {session_id}: {pool_error}")
+
+ # Connection is healthy, release it
+ self.pool.release(conn)
+
+ except asyncio.TimeoutError:
+ self.logger.debug(f"Stale connection test {i+1} timed
out")
try:
- await conn.connection.ensure_closed()
+ await conn.ensure_closed()
except Exception:
pass
- else:
- # Connection is unhealthy, force close
- self.logger.debug(f"Connection unhealthy for session
{session_id}, force closing")
+ except Exception as e:
+ self.logger.debug(f"Stale connection test {i+1}
failed: {e}")
+ try:
+ await conn.ensure_closed()
+ except Exception:
+ pass
+
+ self.logger.debug(f"Stale connection cleanup completed, tested
{test_count} connections")
+
+ except Exception as e:
+ self.logger.error(f"Stale connection cleanup error: {e}")
+
+ async def _recover_pool(self):
+ """Recover connection pool when health check fails"""
+ # Use lock to prevent concurrent recovery attempts
+ async with self.pool_recovery_lock:
+ # Check if another recovery is already in progress
+ if self.pool_recovering:
+ self.logger.debug("Pool recovery already in progress,
waiting...")
+ return
+
+ try:
+ self.pool_recovering = True
+ max_retries = 3
+ retry_delay = 5 # seconds
+
+ for attempt in range(max_retries):
try:
- if conn.connection and not conn.connection.closed:
- await conn.connection.ensure_closed()
- except Exception:
- pass # Ignore errors during forced close
+ self.logger.info(f"๐ Attempting pool recovery (attempt
{attempt + 1}/{max_retries})")
+
+ # Try to close existing pool with timeout
+ if self.pool:
+ try:
+ if not self.pool.closed:
+ self.pool.close()
+ await
asyncio.wait_for(self.pool.wait_closed(), timeout=3.0)
+ self.logger.debug("Old pool closed
successfully")
+ except asyncio.TimeoutError:
+ self.logger.warning("Pool close timeout,
forcing cleanup")
+ except Exception as e:
+ self.logger.warning(f"Error closing old pool:
{e}")
+ finally:
+ self.pool = None
+
+ # Wait before creating new pool (reduced delay)
+ if attempt > 0:
+ await asyncio.sleep(2) # Reduced from 5 to 2
seconds
+
+ # Recreate pool with timeout
+ self.logger.debug("Creating new connection pool...")
+ self.pool = await asyncio.wait_for(
+ aiomysql.create_pool(
+ host=self.host,
+ port=self.port,
+ user=self.user,
+ password=self.password,
+ db=self.database,
+ charset=self.charset,
+ minsize=self.minsize,
+ maxsize=self.maxsize,
+ pool_recycle=self.pool_recycle,
+ connect_timeout=self.connect_timeout,
+ autocommit=True
+ ),
+ timeout=10.0
+ )
+
+ # Test recovered pool with timeout
+ if await asyncio.wait_for(self._test_pool_health(),
timeout=5.0):
+ self.logger.info(f"โ
Pool recovery successful on
attempt {attempt + 1}")
+ # Re-warm the pool with timeout
+ try:
+ await asyncio.wait_for(self._warmup_pool(),
timeout=5.0)
+ except asyncio.TimeoutError:
+ self.logger.warning("Pool warmup timeout, but
recovery successful")
+ return
+ else:
+ self.logger.warning(f"โ Pool recovery health check
failed on attempt {attempt + 1}")
+
+ except asyncio.TimeoutError:
+ self.logger.error(f"Pool recovery attempt {attempt +
1} timed out")
+ if self.pool:
+ try:
+ self.pool.close()
+ except:
+ pass
+ self.pool = None
+ except Exception as e:
+ self.logger.error(f"Pool recovery error on attempt
{attempt + 1}: {e}")
+
+ # Clean up failed pool
+ if self.pool:
+ try:
+ self.pool.close()
+ await
asyncio.wait_for(self.pool.wait_closed(), timeout=2.0)
+ except Exception:
+ pass
+ finally:
+ self.pool = None
- # Close connection wrapper
- await conn.close()
+ # All recovery attempts failed
+ self.logger.error("โ Pool recovery failed after all attempts")
+ self.pool = None
- except Exception as e:
- self.logger.error(f"Error cleaning up connection for session
{session_id}: {e}")
- # Force close if normal cleanup fails
- try:
- if conn.connection and not conn.connection.closed:
- await conn.connection.ensure_closed()
- except Exception:
- pass # Ignore errors during forced close
finally:
- # Remove from session connections
- del self.session_connections[session_id]
- self.logger.debug(f"Cleaned up connection for session:
{session_id}")
-
- async def _health_check_loop(self):
- """Background health check loop"""
- while True:
- try:
- await asyncio.sleep(self.health_check_interval)
- await self._perform_health_check()
- except asyncio.CancelledError:
- break
- except Exception as e:
- self.logger.error(f"Health check error: {e}")
+ self.pool_recovering = False
- async def _perform_health_check(self):
- """Perform enhanced health check"""
+ async def get_connection(self, session_id: str) -> DorisConnection:
+ """Get database connection - Simplified Strategy with pool validation
+
+ Always acquire fresh connection from pool, no session caching
+ """
try:
- unhealthy_sessions = []
-
- # Enhanced health check with comprehensive validation
- for session_id, conn in self.session_connections.items():
- if not await self._comprehensive_connection_health_check(conn):
- unhealthy_sessions.append(session_id)
-
- # Check for stale connections (over 30 minutes old)
- current_time = datetime.utcnow()
- stale_sessions = []
- for session_id, conn in self.session_connections.items():
- if session_id not in unhealthy_sessions: # Don't double-check
- last_used_delta = (current_time -
conn.last_used).total_seconds()
- if last_used_delta > 1800: # 30 minutes
- # Force a comprehensive health check for stale
connections
- if not await
self._comprehensive_connection_health_check(conn):
- stale_sessions.append(session_id)
-
- all_problematic_sessions = list(set(unhealthy_sessions +
stale_sessions))
-
- # Clean up problematic connections
- for session_id in all_problematic_sessions:
- await self._cleanup_session_connection(session_id)
- self.metrics.failed_connections += 1
-
- # Update metrics
- await self._update_connection_metrics()
- self.metrics.last_health_check = datetime.utcnow()
-
- if all_problematic_sessions:
- self.logger.warning(f"Health check: cleaned up
{len(unhealthy_sessions)} unhealthy and {len(stale_sessions)} stale
connections")
- else:
- self.logger.debug(f"Health check: all
{len(self.session_connections)} connections healthy")
+ # Wait for any ongoing recovery to complete
+ if self.pool_recovering:
+ self.logger.debug(f"Pool recovery in progress, waiting for
completion...")
+ # Wait for recovery to complete (max 10 seconds)
+ for _ in range(10):
+ if not self.pool_recovering:
+ break
+ await asyncio.sleep(0.5)
+
+ if self.pool_recovering:
+ self.logger.error("Pool recovery is taking too long,
proceeding anyway")
+ # Don't raise error, try to continue
+
+ # Check if pool is available
+ if not self.pool:
+ self.logger.warning("Connection pool is not available,
attempting recovery...")
+ await self._recover_pool()
+
+ if not self.pool:
+ raise RuntimeError("Connection pool is not available and
recovery failed")
+
+ # Check if pool is closed
+ if self.pool.closed:
+ self.logger.warning("Connection pool is closed, attempting
recovery...")
+ await self._recover_pool()
+
+ if not self.pool or self.pool.closed:
+ raise RuntimeError("Connection pool is closed and recovery
failed")
+
+ # Simple strategy: always get fresh connection from pool
+ raw_conn = await self.pool.acquire()
+
+ # Wrap in DorisConnection
+ doris_conn = DorisConnection(raw_conn, session_id,
self.security_manager)
+
+ # Simple validation - just check if connection is open
+ if raw_conn.closed:
+ raise RuntimeError("Acquired connection is already closed")
+
+ self.logger.debug(f"โ
Acquired fresh connection for session
{session_id}")
+ return doris_conn
+
+ except Exception as e:
+ self.logger.error(f"Failed to get connection for session
{session_id}: {e}")
+ raise
+ async def release_connection(self, session_id: str, connection:
DorisConnection):
+ """Release connection back to pool - Simplified Strategy"""
+ try:
+ if connection and connection.connection:
+ # Simple strategy: always return to pool
+ if not connection.connection.closed:
+ self.pool.release(connection.connection)
+ self.logger.debug(f"โ
Released connection for session
{session_id}")
+ else:
+ self.logger.debug(f"Connection already closed for session
{session_id}")
+
except Exception as e:
- self.logger.error(f"Health check failed: {e}")
- # If health check fails, try to diagnose the issue
+ self.logger.error(f"Error releasing connection for session
{session_id}: {e}")
+ # Force close if release fails
try:
- diagnosis = await self.diagnose_connection_health()
- self.logger.error(f"Connection diagnosis: {diagnosis}")
+ if connection and connection.connection:
+ await connection.connection.ensure_closed()
except Exception:
- pass # Don't let diagnosis failure crash health check
-
- async def _cleanup_loop(self):
- """Background cleanup loop"""
- while True:
- try:
- await asyncio.sleep(300) # Run every 5 minutes
- await self._cleanup_idle_connections()
- except asyncio.CancelledError:
- break
- except Exception as e:
- self.logger.error(f"Cleanup loop error: {e}")
-
- async def _cleanup_idle_connections(self):
- """Clean up idle connections"""
- current_time = datetime.utcnow()
- idle_sessions = []
-
- for session_id, conn in self.session_connections.items():
- # Check if connection has exceeded maximum age
- age = (current_time - conn.created_at).total_seconds()
- if age > self.max_connection_age:
- idle_sessions.append(session_id)
-
- # Clean up idle connections
- for session_id in idle_sessions:
- await self._cleanup_session_connection(session_id)
-
- if idle_sessions:
- self.logger.info(f"Cleaned up {len(idle_sessions)} idle
connections")
-
- async def _update_connection_metrics(self):
- """Update connection metrics"""
- self.metrics.active_connections = len(self.session_connections)
- if self.pool:
- self.metrics.idle_connections = self.pool.freesize
-
- async def get_metrics(self) -> ConnectionMetrics:
- """Get connection metrics"""
- await self._update_connection_metrics()
- return self.metrics
-
- async def execute_query(
- self, session_id: str, sql: str, params: tuple | None = None,
auth_context=None
- ) -> QueryResult:
- """Execute query with enhanced error handling and retry logic"""
- max_retries = 2
- for attempt in range(max_retries):
- try:
- conn = await self.get_connection(session_id)
- return await conn.execute(sql, params, auth_context)
- except Exception as e:
- error_msg = str(e).lower()
- # Check for connection-related errors that warrant retry
- is_connection_error = any(keyword in error_msg for keyword in [
- 'at_eof', 'connection', 'closed', 'nonetype', 'reader',
'transport'
- ])
-
- if is_connection_error and attempt < max_retries - 1:
- self.logger.warning(f"Connection error during query
execution (attempt {attempt + 1}): {e}")
- # Clean up the problematic connection
- await self.release_connection(session_id)
- # Wait before retry
- await asyncio.sleep(0.5 * (attempt + 1))
- continue
- else:
- # Not a connection error or final retry - re-raise
- raise
-
- @asynccontextmanager
- async def get_connection_context(self, session_id: str):
- """Get connection context manager"""
- conn = await self.get_connection(session_id)
- try:
- yield conn
- finally:
- # Connection will be reused, no need to close here
- pass
+ pass
async def close(self):
"""Close connection manager"""
try:
# Cancel background tasks
- if self._health_check_task:
- self._health_check_task.cancel()
+ if self.pool_health_check_task:
+ self.pool_health_check_task.cancel()
try:
- await self._health_check_task
+ await self.pool_health_check_task
except asyncio.CancelledError:
pass
- if self._cleanup_task:
- self._cleanup_task.cancel()
+ if self.pool_cleanup_task:
+ self.pool_cleanup_task.cancel()
try:
- await self._cleanup_task
+ await self.pool_cleanup_task
except asyncio.CancelledError:
pass
- # Clean up all session connections
- for session_id in list(self.session_connections.keys()):
- await self._cleanup_session_connection(session_id)
-
# Close connection pool
if self.pool:
self.pool.close()
@@ -661,15 +606,62 @@ class DorisConnectionManager:
async def test_connection(self) -> bool:
"""Test database connection using robust connection test"""
- return await self._robust_connection_test()
+ return await self._test_pool_health()
+
+ async def get_metrics(self) -> ConnectionMetrics:
+ """Get connection pool metrics - Simplified Strategy"""
+ try:
+ if self.pool:
+ self.metrics.idle_connections = self.pool.freesize
+ self.metrics.active_connections = self.pool.size -
self.pool.freesize
+ else:
+ self.metrics.idle_connections = 0
+ self.metrics.active_connections = 0
+
+ return self.metrics
+ except Exception as e:
+ self.logger.error(f"Error getting metrics: {e}")
+ return self.metrics
+
+ async def execute_query(
+ self, session_id: str, sql: str, params: tuple | None = None,
auth_context=None
+ ) -> QueryResult:
+ """Execute query - Simplified Strategy with automatic connection
management"""
+ connection = None
+ try:
+ # Always get fresh connection from pool
+ connection = await self.get_connection(session_id)
+
+ # Execute query
+ result = await connection.execute(sql, params, auth_context)
+
+ return result
+
+ except Exception as e:
+ self.logger.error(f"Query execution failed for session
{session_id}: {e}")
+ raise
+ finally:
+ # Always release connection back to pool
+ if connection:
+ await self.release_connection(session_id, connection)
+
+ @asynccontextmanager
+ async def get_connection_context(self, session_id: str):
+ """Get connection context manager - Simplified Strategy"""
+ connection = None
+ try:
+ connection = await self.get_connection(session_id)
+ yield connection
+ finally:
+ if connection:
+ await self.release_connection(session_id, connection)
async def diagnose_connection_health(self) -> Dict[str, Any]:
- """Diagnose connection pool and session health"""
+ """Diagnose connection pool health - Simplified Strategy"""
diagnosis = {
"timestamp": datetime.utcnow().isoformat(),
"pool_status": "unknown",
- "session_connections": {},
- "problematic_connections": [],
+ "pool_info": {},
"recommendations": []
}
@@ -693,55 +685,16 @@ class DorisConnectionManager:
"max_size": self.pool.maxsize
}
- # Check session connections
- problematic_sessions = []
- for session_id, conn in self.session_connections.items():
- conn_status = {
- "session_id": session_id,
- "created_at": conn.created_at.isoformat(),
- "last_used": conn.last_used.isoformat(),
- "query_count": conn.query_count,
- "is_healthy": conn.is_healthy
- }
-
- # Detailed connection checks
- if conn.connection:
- conn_status["connection_closed"] = conn.connection.closed
- conn_status["has_reader"] = hasattr(conn.connection,
'_reader') and conn.connection._reader is not None
-
- if hasattr(conn.connection, '_reader') and
conn.connection._reader:
- conn_status["reader_transport"] =
conn.connection._reader._transport is not None
- else:
- conn_status["reader_transport"] = False
- else:
- conn_status["connection_closed"] = True
- conn_status["has_reader"] = False
- conn_status["reader_transport"] = False
-
- # Check if connection is problematic
- if (not conn.is_healthy or
- conn_status["connection_closed"] or
- not conn_status["has_reader"] or
- not conn_status["reader_transport"]):
- problematic_sessions.append(session_id)
- diagnosis["problematic_connections"].append(conn_status)
-
- diagnosis["session_connections"][session_id] = conn_status
-
- # Generate recommendations
- if problematic_sessions:
- diagnosis["recommendations"].append(f"Clean up
{len(problematic_sessions)} problematic connections")
-
+ # Generate recommendations based on pool status
if self.pool.freesize == 0 and self.pool.size >= self.pool.maxsize:
diagnosis["recommendations"].append("Connection pool exhausted
- consider increasing max_connections")
- # Auto-cleanup problematic connections
- for session_id in problematic_sessions:
- try:
- await self._cleanup_session_connection(session_id)
- self.logger.info(f"Auto-cleaned problematic connection for
session: {session_id}")
- except Exception as e:
- self.logger.error(f"Failed to auto-clean session
{session_id}: {e}")
+ # Test pool health
+ if await self._test_pool_health():
+ diagnosis["pool_health"] = "healthy"
+ else:
+ diagnosis["pool_health"] = "unhealthy"
+ diagnosis["recommendations"].append("Pool health check failed
- may need recovery")
return diagnosis
@@ -768,7 +721,8 @@ class ConnectionPoolMonitor:
status = {
"pool_size": self.connection_manager.pool.size if
self.connection_manager.pool else 0,
"free_connections": self.connection_manager.pool.freesize if
self.connection_manager.pool else 0,
- "active_sessions":
len(self.connection_manager.session_connections),
+ "active_connections": metrics.active_connections,
+ "idle_connections": metrics.idle_connections,
"total_connections": metrics.total_connections,
"failed_connections": metrics.failed_connections,
"connection_errors": metrics.connection_errors,
@@ -779,54 +733,33 @@ class ConnectionPoolMonitor:
return status
async def get_session_details(self) -> list[dict[str, Any]]:
- """Get session connection details"""
- sessions = []
-
- for session_id, conn in
self.connection_manager.session_connections.items():
- session_info = {
- "session_id": session_id,
- "created_at": conn.created_at.isoformat(),
- "last_used": conn.last_used.isoformat(),
- "query_count": conn.query_count,
- "is_healthy": conn.is_healthy,
- "connection_age": (datetime.utcnow() -
conn.created_at).total_seconds(),
- }
- sessions.append(session_info)
-
- return sessions
+ """Get session connection details - Simplified Strategy (No session
caching)"""
+ # In simplified strategy, we don't maintain session connections
+ # Return empty list as connections are managed by the pool directly
+ return []
async def generate_health_report(self) -> dict[str, Any]:
- """Generate connection health report"""
+ """Generate connection health report - Simplified Strategy"""
pool_status = await self.get_pool_status()
- session_details = await self.get_session_details()
- # Calculate health statistics
- healthy_sessions = sum(1 for s in session_details if s["is_healthy"])
- total_sessions = len(session_details)
- health_ratio = healthy_sessions / total_sessions if total_sessions > 0
else 1.0
+ # Calculate pool utilization
+ pool_utilization = 1.0 - (pool_status["free_connections"] /
pool_status["pool_size"]) if pool_status["pool_size"] > 0 else 0.0
report = {
"timestamp": datetime.utcnow().isoformat(),
"pool_status": pool_status,
- "session_summary": {
- "total_sessions": total_sessions,
- "healthy_sessions": healthy_sessions,
- "health_ratio": health_ratio,
- },
- "session_details": session_details,
+ "pool_utilization": pool_utilization,
"recommendations": [],
}
- # Add recommendations based on health status
- if health_ratio < 0.8:
- report["recommendations"].append("Consider checking database
connectivity and network stability")
-
+ # Add recommendations based on pool status
if pool_status["connection_errors"] > 10:
report["recommendations"].append("High connection error rate
detected, review connection configuration")
- if pool_status["active_sessions"] > pool_status["pool_size"] * 0.9:
+ if pool_utilization > 0.9:
report["recommendations"].append("Connection pool utilization is
high, consider increasing pool size")
- return report
-
-
+ if pool_status["free_connections"] == 0:
+ report["recommendations"].append("No free connections available,
consider increasing pool size")
+
+ return report
\ No newline at end of file
diff --git a/doris_mcp_server/utils/schema_extractor.py
b/doris_mcp_server/utils/schema_extractor.py
index fd711c8..db973e7 100644
--- a/doris_mcp_server/utils/schema_extractor.py
+++ b/doris_mcp_server/utils/schema_extractor.py
@@ -1215,33 +1215,39 @@ class MetadataExtractor:
try:
if self.connection_manager:
import asyncio
+ import concurrent.futures
+ import threading
- # Try to run the async query
- try:
- # Check if there's a running event loop
- loop = asyncio.get_running_loop()
- # If we're in an async context, we need to run in a
separate thread
- import concurrent.futures
-
- def run_in_new_loop():
- new_loop = asyncio.new_event_loop()
- asyncio.set_event_loop(new_loop)
+ # Always run in a separate thread with new event loop to avoid
conflicts
+ def run_in_new_loop():
+ # Create new event loop for this thread
+ new_loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(new_loop)
+ try:
+ return new_loop.run_until_complete(
+ self._execute_query_async(query, db_name,
return_dataframe)
+ )
+ finally:
try:
- return new_loop.run_until_complete(
- self._execute_query_async(query, db_name,
return_dataframe)
- )
+ # Properly close the loop
+ pending = asyncio.all_tasks(new_loop)
+ if pending:
+
new_loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
finally:
new_loop.close()
-
- with concurrent.futures.ThreadPoolExecutor() as executor:
- future = executor.submit(run_in_new_loop)
+
+ # Use ThreadPoolExecutor to run in separate thread
+ with concurrent.futures.ThreadPoolExecutor(max_workers=1) as
executor:
+ future = executor.submit(run_in_new_loop)
+ try:
return future.result(timeout=30)
-
- except RuntimeError:
- # No running loop, we can safely create one
- return asyncio.run(
- self._execute_query_async(query, db_name,
return_dataframe)
- )
+ except concurrent.futures.TimeoutError:
+ logger.error("Query execution timed out after 30
seconds")
+ if return_dataframe:
+ import pandas as pd
+ return pd.DataFrame()
+ else:
+ return []
else:
# Fallback: Return empty result
logger.warning("No connection manager provided, returning
empty result")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]