This is an automated email from the ASF dual-hosted git repository.

freeoneplus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-mcp-server.git


The following commit(s) were added to refs/heads/master by this push:
     new 4052b7e  [BUG]Completely solve the at_eof problem (#20)
4052b7e is described below

commit 4052b7e9389ebfe8b6441ae2d46613c16c1d7757
Author: Yijia Su <[email protected]>
AuthorDate: Thu Jul 10 13:08:32 2025 +0800

    [BUG]Completely solve the at_eof problem (#20)
    
    * fix at_eof bug
    
    * update uv.lock
    
    * fix bug and change pool min values
    
    * Fixed startup errors caused by multiple versions of MCP services
    
    * fix connection bug
---
 doris_mcp_server/utils/db.py               | 971 ++++++++++++++---------------
 doris_mcp_server/utils/schema_extractor.py |  50 +-
 2 files changed, 480 insertions(+), 541 deletions(-)

diff --git a/doris_mcp_server/utils/db.py b/doris_mcp_server/utils/db.py
index e04b4df..08ed7db 100644
--- a/doris_mcp_server/utils/db.py
+++ b/doris_mcp_server/utils/db.py
@@ -29,6 +29,7 @@ from contextlib import asynccontextmanager
 from dataclasses import dataclass
 from datetime import datetime
 from typing import Any, Dict, List
+import random
 
 import aiomysql
 from aiomysql import Connection, Pool
@@ -70,6 +71,7 @@ class DorisConnection:
         self.query_count = 0
         self.is_healthy = True
         self.security_manager = security_manager
+        self.logger = logging.getLogger(__name__)
 
     async def execute(self, sql: str, params: tuple | None = None, 
auth_context=None) -> QueryResult:
         """Execute SQL query"""
@@ -135,31 +137,46 @@ class DorisConnection:
             raise
 
     async def ping(self) -> bool:
-        """Check connection health status"""
+        """Check connection health status with enhanced at_eof error 
detection"""
         try:
-            # Check if connection exists and is not closed
+            # Check 1: Connection exists and is not closed
             if not self.connection or self.connection.closed:
                 self.is_healthy = False
                 return False
             
-            # Check if connection has _reader (aiomysql internal state)
-            # This prevents the 'NoneType' object has no attribute 'at_eof' 
error
-            if not hasattr(self.connection, '_reader') or 
self.connection._reader is None:
+            # Check 2: Use ONLY safe operations - avoid internal state access
+            # Instead of checking _reader state directly, use a simple query 
test
+            try:
+                # Use a simple query with timeout instead of ping() to avoid 
at_eof issues
+                async with asyncio.timeout(3):  # 3 second timeout
+                    async with self.connection.cursor() as cursor:
+                        await cursor.execute("SELECT 1")
+                        result = await cursor.fetchone()
+                        if result and result[0] == 1:
+                            self.is_healthy = True
+                            return True
+                        else:
+                            self.logger.debug(f"Connection {self.session_id} 
ping query returned unexpected result")
+                            self.is_healthy = False
+                            return False
+            
+            except asyncio.TimeoutError:
+                self.logger.debug(f"Connection {self.session_id} ping timed 
out")
                 self.is_healthy = False
                 return False
-            
-            # Additional check for reader's state
-            if hasattr(self.connection._reader, '_transport') and 
self.connection._reader._transport is None:
+            except Exception as query_error:
+                # Check for specific at_eof related errors
+                error_str = str(query_error).lower()
+                if 'at_eof' in error_str or 'nonetype' in error_str:
+                    self.logger.debug(f"Connection {self.session_id} ping 
failed with at_eof error: {query_error}")
+                else:
+                    self.logger.debug(f"Connection {self.session_id} ping 
failed: {query_error}")
                 self.is_healthy = False
                 return False
-            
-            # Try to ping the connection
-            await self.connection.ping()
-            self.is_healthy = True
-            return True
-        except (AttributeError, OSError, ConnectionError, Exception) as e:
-            # Log the specific error for debugging
-            logging.debug(f"Connection ping failed for session 
{self.session_id}: {e}")
+                
+        except Exception as e:
+            # Catch any other unexpected errors
+            self.logger.debug(f"Connection {self.session_id} ping failed with 
unexpected error: {e}")
             self.is_healthy = False
             return False
 
@@ -173,482 +190,410 @@ class DorisConnection:
 
 
 class DorisConnectionManager:
-    """Doris database connection manager
+    """Doris database connection manager - Simplified Strategy
 
-    Provides connection pool management, connection health monitoring, fault 
recovery and other functions
-    Supports session-level connection reuse and intelligent load balancing
-    Integrates security manager to provide unified security validation and 
data masking
+    Uses direct connection pool management without session-level caching
+    Implements connection pool health monitoring and proactive cleanup
     """
 
     def __init__(self, config, security_manager=None):
         self.config = config
+        self.security_manager = security_manager
         self.pool: Pool | None = None
-        self.session_connections: dict[str, DorisConnection] = {}
-        self.metrics = ConnectionMetrics()
         self.logger = logging.getLogger(__name__)
-        self.security_manager = security_manager
-
-        # Health check configuration
-        self.health_check_interval = config.database.health_check_interval or 
60
-        self.max_connection_age = config.database.max_connection_age or 3600
-        self.connection_timeout = config.database.connection_timeout or 30
-
-        # Start background tasks
-        self._health_check_task = None
-        self._cleanup_task = None
+        self.metrics = ConnectionMetrics()
+        
+        # Remove session-level connection management
+        # self.session_connections = {}  # REMOVED
+        
+        # Pool health monitoring
+        self.health_check_interval = 30  # seconds
+        self.pool_warmup_size = 3  # connections to maintain
+        self.pool_health_check_task = None
+        self.pool_cleanup_task = None
+        
+        # Pool recovery lock to prevent race conditions
+        self.pool_recovery_lock = asyncio.Lock()
+        self.pool_recovering = False
+        
+        # Database connection parameters from config.database
+        self.host = config.database.host
+        self.port = config.database.port
+        self.user = config.database.user
+        self.password = config.database.password
+        self.database = config.database.database
+        # Convert charset to aiomysql compatible format
+        charset_map = {"UTF8": "utf8", "UTF8MB4": "utf8mb4"}
+        self.charset = charset_map.get(config.database.charset.upper(), 
config.database.charset.lower())
+        self.connect_timeout = config.database.connection_timeout
+        
+        # Connection pool parameters - more conservative settings
+        self.minsize = config.database.min_connections  # This is always 0
+        self.maxsize = config.database.max_connections or 10
+        self.pool_recycle = config.database.max_connection_age or 3600  # 1 
hour, more conservative
 
     async def initialize(self):
-        """Initialize connection manager"""
+        """Initialize connection pool with health monitoring"""
         try:
-            self.logger.info(f"Initializing connection pool to 
{self.config.database.host}:{self.config.database.port}")
-            
-            # Validate configuration
-            if not self.config.database.host:
-                raise ValueError("Database host is required")
-            if not self.config.database.user:
-                raise ValueError("Database user is required")
-            if not self.config.database.password:
-                self.logger.warning("Database password is empty, this may 
cause connection issues")
-            
-            # Create connection pool with improved stability parameters
-            # Key change: Set minsize=0 to avoid pre-creation issues that 
cause at_eof errors
+            self.logger.info(f"Initializing connection pool to 
{self.host}:{self.port}")
+            
+            # Create connection pool
             self.pool = await aiomysql.create_pool(
-                host=self.config.database.host,
-                port=self.config.database.port,
-                user=self.config.database.user,
-                password=self.config.database.password,
-                db=self.config.database.database,
-                charset="utf8",
-                minsize=self.config.database.min_connections,  # Always 0 per 
configuration to avoid at_eof issues
-
-                maxsize=self.config.database.max_connections or 20,
-                autocommit=True,
-                connect_timeout=self.connection_timeout,
-                # Enhanced stability parameters
-                pool_recycle=7200,  # Recycle connections every 2 hours
-                echo=False,  # Don't echo SQL statements
-            )
-
-            # Test the connection pool with a more robust test
-            if not await self._robust_connection_test():
-                raise RuntimeError("Connection pool robust test failed")
-
-            self.logger.info(
-                f"Connection pool initialized successfully with on-demand 
connection creation, "
-                f"min connections: {self.config.database.min_connections}, "
-
-                f"max connections: {self.config.database.max_connections or 
20}"
+                host=self.host,
+                port=self.port,
+                user=self.user,
+                password=self.password,
+                db=self.database,
+                charset=self.charset,
+                minsize=self.minsize,
+                maxsize=self.maxsize,
+                pool_recycle=self.pool_recycle,
+                connect_timeout=self.connect_timeout,
+                autocommit=True
             )
-
+            
+            # Test initial connection
+            if not await self._test_pool_health():
+                raise RuntimeError("Connection pool health check failed")
+            
             # Start background monitoring tasks
-            self._health_check_task = 
asyncio.create_task(self._health_check_loop())
-            self._cleanup_task = asyncio.create_task(self._cleanup_loop())
+            self.pool_health_check_task = 
asyncio.create_task(self._pool_health_monitor())
+            self.pool_cleanup_task = 
asyncio.create_task(self._pool_cleanup_monitor())
+            
+            # Perform initial pool warmup
+            await self._warmup_pool()
+            
+            self.logger.info(f"Connection pool initialized successfully, min 
connections: {self.minsize}, max connections: {self.maxsize}")
+            
+        except Exception as e:
+            self.logger.error(f"Failed to initialize connection pool: {e}")
+            raise
 
+    async def _test_pool_health(self) -> bool:
+        """Test connection pool health"""
+        try:
+            async with self.pool.acquire() as conn:
+                async with conn.cursor() as cursor:
+                    await cursor.execute("SELECT 1")
+                    result = await cursor.fetchone()
+                    return result and result[0] == 1
         except Exception as e:
-            self.logger.error(f"Connection pool initialization failed: {e}")
-            # Clean up partial initialization
-            if self.pool:
+            self.logger.error(f"Pool health test failed: {e}")
+            return False
+
+    async def _warmup_pool(self):
+        """Warm up connection pool by creating initial connections"""
+        self.logger.info(f"๐Ÿ”ฅ Warming up connection pool with 
{self.pool_warmup_size} connections")
+        
+        warmup_connections = []
+        try:
+            # Acquire connections to force pool to create them
+            for i in range(self.pool_warmup_size):
                 try:
-                    self.pool.close()
-                    await self.pool.wait_closed()
+                    conn = await self.pool.acquire()
+                    warmup_connections.append(conn)
+                    self.logger.debug(f"Warmed up connection 
{i+1}/{self.pool_warmup_size}")
+                except Exception as e:
+                    self.logger.warning(f"Failed to warm up connection {i+1}: 
{e}")
+                    break
+            
+            # Release all warmup connections back to pool
+            for conn in warmup_connections:
+                try:
+                    self.pool.release(conn)
+                except Exception as e:
+                    self.logger.warning(f"Failed to release warmup connection: 
{e}")
+            
+            self.logger.info(f"โœ… Pool warmup completed, 
{len(warmup_connections)} connections created")
+            
+        except Exception as e:
+            self.logger.error(f"Pool warmup failed: {e}")
+            # Clean up any remaining connections
+            for conn in warmup_connections:
+                try:
+                    await conn.ensure_closed()
                 except Exception:
                     pass
-                self.pool = None
-            raise
 
-    async def _robust_connection_test(self) -> bool:
-        """Perform a robust connection test that validates full connection 
health"""
-        max_retries = 3
-        for attempt in range(max_retries):
+    async def _pool_health_monitor(self):
+        """Background task to monitor pool health"""
+        self.logger.info("๐Ÿฉบ Starting pool health monitor")
+        
+        while True:
             try:
-                self.logger.debug(f"Testing connection pool (attempt {attempt 
+ 1}/{max_retries})")
-                
-                # Test connection creation and validation
-                test_conn = await self._create_raw_connection_with_validation()
-                if test_conn:
-                    # Test basic query execution
-                    async with test_conn.cursor() as cursor:
-                        await cursor.execute("SELECT 1")
-                        result = await cursor.fetchone()
-                        if result and result[0] == 1:
-                            self.logger.debug("Connection pool test 
successful")
-                            # Return connection to pool
-                            if self.pool:
-                                self.pool.release(test_conn)
-                            return True
-                        else:
-                            self.logger.warning("Connection test query 
returned unexpected result")
-                    
-                    # Close test connection if we get here
-                    await test_conn.ensure_closed()
-                
+                await asyncio.sleep(self.health_check_interval)
+                await self._check_pool_health()
+            except asyncio.CancelledError:
+                self.logger.info("Pool health monitor stopped")
+                break
             except Exception as e:
-                self.logger.warning(f"Connection test attempt {attempt + 1} 
failed: {e}")
-                if attempt == max_retries - 1:
-                    self.logger.error("All connection test attempts failed")
-                    return False
-                else:
-                    # Wait before retry
-                    await asyncio.sleep(1.0 * (attempt + 1))
-        
-        return False
+                self.logger.error(f"Pool health monitor error: {e}")
 
-    async def _create_raw_connection_with_validation(self, max_retries: int = 
3):
-        """Create a raw connection with comprehensive validation"""
-        for attempt in range(max_retries):
+    async def _pool_cleanup_monitor(self):
+        """Background task to clean up stale connections"""
+        self.logger.info("๐Ÿงน Starting pool cleanup monitor")
+        
+        while True:
             try:
-                if not self.pool:
-                    raise RuntimeError("Connection pool not initialized")
-
-                # Acquire connection from pool
-                raw_connection = await self.pool.acquire()
-                
-                # Basic connection validation
-                if not raw_connection:
-                    self.logger.warning(f"Pool returned None connection 
(attempt {attempt + 1})")
-                    continue
-                
-                if raw_connection.closed:
-                    self.logger.warning(f"Pool returned closed connection 
(attempt {attempt + 1})")
-                    continue
-                
-                # Perform a simple ping test instead of checking internal state
-                # Internal state (_reader, _transport) might not be fully 
initialized yet
-                try:
-                    # Test basic connectivity with a simple query
-                    async with raw_connection.cursor() as cursor:
-                        await cursor.execute("SELECT 1")
-                        result = await cursor.fetchone()
-                        if result and result[0] == 1:
-                            self.logger.debug(f"Successfully created and 
validated raw connection (attempt {attempt + 1})")
-                            return raw_connection
-                        else:
-                            self.logger.warning(f"Connection test query failed 
(attempt {attempt + 1})")
-                            await raw_connection.ensure_closed()
-                            continue
-                            
-                except Exception as e:
-                    # Check if this is an at_eof error specifically
-                    error_str = str(e).lower()
-                    if 'at_eof' in error_str or 'nonetype' in error_str:
-                        self.logger.warning(f"Connection has at_eof issue 
(attempt {attempt + 1}): {e}")
-                    else:
-                        self.logger.warning(f"Connection test failed (attempt 
{attempt + 1}): {e}")
-                    
-                    try:
-                        await raw_connection.ensure_closed()
-                    except Exception:
-                        pass
-                    continue
-                
+                await asyncio.sleep(self.health_check_interval * 2)  # Less 
frequent cleanup
+                await self._cleanup_stale_connections()
+            except asyncio.CancelledError:
+                self.logger.info("Pool cleanup monitor stopped")
+                break
             except Exception as e:
-                self.logger.warning(f"Raw connection creation attempt {attempt 
+ 1} failed: {e}")
-                if attempt == max_retries - 1:
-                    raise RuntimeError(f"Failed to create valid connection 
after {max_retries} attempts: {e}")
-                else:
-                    # Exponential backoff
-                    await asyncio.sleep(0.5 * (2 ** attempt))
-        
-        raise RuntimeError("Failed to create valid connection")
-
-    async def get_connection(self, session_id: str) -> DorisConnection:
-        """Get database connection with enhanced reliability
-
-        Supports session-level connection reuse to improve performance and 
consistency
-        """
-        # Check if there's an existing session connection
-        if session_id in self.session_connections:
-            conn = self.session_connections[session_id]
-            # Enhanced connection health check
-            if await self._comprehensive_connection_health_check(conn):
-                return conn
-            else:
-                # Connection is unhealthy, clean up and create new one
-                self.logger.debug(f"Existing connection unhealthy for session 
{session_id}, creating new one")
-                await self._cleanup_session_connection(session_id)
-
-        # Create new connection with retry logic
-        return await self._create_new_connection_with_retry(session_id)
+                self.logger.error(f"Pool cleanup monitor error: {e}")
 
-    async def _comprehensive_connection_health_check(self, conn: 
DorisConnection) -> bool:
-        """Perform comprehensive connection health check"""
+    async def _check_pool_health(self):
+        """Check and maintain pool health"""
         try:
-            # Check basic connection state
-            if not conn.connection or conn.connection.closed:
-                return False
-            
-            # Instead of checking internal state, perform a simple ping test
-            # This is more reliable and less dependent on aiomysql internals
-            if not await conn.ping():
-                return False
-            
-            return True
+            # Skip health check if already recovering
+            if self.pool_recovering:
+                self.logger.debug("Pool recovery in progress, skipping health 
check")
+                return
+                
+            # Test pool with a simple query
+            health_ok = await self._test_pool_health()
             
-        except Exception as e:
-            # Check for at_eof errors specifically
-            error_str = str(e).lower()
-            if 'at_eof' in error_str:
-                self.logger.debug(f"Connection health check failed with at_eof 
error: {e}")
+            if health_ok:
+                self.logger.debug("โœ… Pool health check passed")
+                self.metrics.last_health_check = datetime.utcnow()
             else:
-                self.logger.debug(f"Connection health check failed: {e}")
-            return False
-
-    async def _create_new_connection_with_retry(self, session_id: str, 
max_retries: int = 3) -> DorisConnection:
-        """Create new database connection with retry logic"""
-        for attempt in range(max_retries):
-            try:
-                # Get validated raw connection
-                raw_connection = await 
self._create_raw_connection_with_validation()
+                self.logger.warning("โŒ Pool health check failed, attempting 
recovery")
+                await self._recover_pool()
                 
-                # Create wrapped connection
-                doris_conn = DorisConnection(raw_connection, session_id, 
self.security_manager)
-                
-                # Comprehensive connection test
-                if await 
self._comprehensive_connection_health_check(doris_conn):
-                    # Store in session connections
-                    self.session_connections[session_id] = doris_conn
-                    self.metrics.total_connections += 1
-                    self.logger.debug(f"Successfully created new connection 
for session: {session_id}")
-                    return doris_conn
-                else:
-                    # Connection failed health check, clean up and retry
-                    self.logger.warning(f"New connection failed health check 
for session {session_id} (attempt {attempt + 1})")
-                    try:
-                        await doris_conn.close()
-                    except Exception:
-                        pass
-                    
-            except Exception as e:
-                self.logger.warning(f"Connection creation attempt {attempt + 
1} failed for session {session_id}: {e}")
-                if attempt == max_retries - 1:
-                    self.metrics.connection_errors += 1
-                    raise RuntimeError(f"Failed to create connection for 
session {session_id} after {max_retries} attempts: {e}")
-                else:
-                    # Exponential backoff
-                    await asyncio.sleep(0.5 * (2 ** attempt))
-        
-        raise RuntimeError(f"Unexpected failure in connection creation for 
session {session_id}")
-
-    async def release_connection(self, session_id: str):
-        """Release session connection"""
-        if session_id in self.session_connections:
-            await self._cleanup_session_connection(session_id)
+        except Exception as e:
+            self.logger.error(f"Pool health check error: {e}")
+            await self._recover_pool()
 
-    async def _cleanup_session_connection(self, session_id: str):
-        """Clean up session connection with enhanced safety"""
-        if session_id in self.session_connections:
-            conn = self.session_connections[session_id]
-            try:
-                # Simplified connection validation before returning to pool
-                connection_healthy = False
+    async def _cleanup_stale_connections(self):
+        """Proactively clean up potentially stale connections"""
+        try:
+            self.logger.debug("๐Ÿงน Checking for stale connections")
+            
+            # Get pool statistics
+            pool_size = self.pool.size
+            pool_free = self.pool.freesize
+            
+            # If pool has idle connections, test some of them
+            if pool_free > 0:
+                test_count = min(pool_free, 2)  # Test up to 2 idle connections
                 
-                if (self.pool and 
-                    conn.connection and 
-                    not conn.connection.closed):
-                    
-                    # Test if connection is still healthy with a simple check
+                for i in range(test_count):
                     try:
-                        # Quick ping test to see if connection is usable
-                        async with conn.connection.cursor() as cursor:
-                            await cursor.execute("SELECT 1")
+                        # Acquire connection, test it, and release
+                        conn = await asyncio.wait_for(self.pool.acquire(), 
timeout=5)
+                        
+                        # Quick test
+                        async with conn.cursor() as cursor:
+                            await asyncio.wait_for(cursor.execute("SELECT 1"), 
timeout=3)
                             await cursor.fetchone()
-                        connection_healthy = True
-                    except Exception as test_error:
-                        self.logger.debug(f"Connection health test failed for 
session {session_id}: {test_error}")
-                        connection_healthy = False
-                
-                if connection_healthy:
-                    # Connection appears healthy, return to pool
-                    try:
-                        self.pool.release(conn.connection)
-                        self.logger.debug(f"Successfully returned connection 
to pool for session {session_id}")
-                    except Exception as pool_error:
-                        self.logger.debug(f"Failed to return connection to 
pool for session {session_id}: {pool_error}")
+                        
+                        # Connection is healthy, release it
+                        self.pool.release(conn)
+                        
+                    except asyncio.TimeoutError:
+                        self.logger.debug(f"Stale connection test {i+1} timed 
out")
                         try:
-                            await conn.connection.ensure_closed()
+                            await conn.ensure_closed()
                         except Exception:
                             pass
-                else:
-                    # Connection is unhealthy, force close
-                    self.logger.debug(f"Connection unhealthy for session 
{session_id}, force closing")
+                    except Exception as e:
+                        self.logger.debug(f"Stale connection test {i+1} 
failed: {e}")
+                        try:
+                            await conn.ensure_closed()
+                        except Exception:
+                            pass
+                
+                self.logger.debug(f"Stale connection cleanup completed, tested 
{test_count} connections")
+                
+        except Exception as e:
+            self.logger.error(f"Stale connection cleanup error: {e}")
+
+    async def _recover_pool(self):
+        """Recover connection pool when health check fails"""
+        # Use lock to prevent concurrent recovery attempts
+        async with self.pool_recovery_lock:
+            # Check if another recovery is already in progress
+            if self.pool_recovering:
+                self.logger.debug("Pool recovery already in progress, 
waiting...")
+                return
+                
+            try:
+                self.pool_recovering = True
+                max_retries = 3
+                retry_delay = 5  # seconds
+                
+                for attempt in range(max_retries):
                     try:
-                        if conn.connection and not conn.connection.closed:
-                            await conn.connection.ensure_closed()
-                    except Exception:
-                        pass  # Ignore errors during forced close
+                        self.logger.info(f"๐Ÿ”„ Attempting pool recovery (attempt 
{attempt + 1}/{max_retries})")
+                        
+                        # Try to close existing pool with timeout
+                        if self.pool:
+                            try:
+                                if not self.pool.closed:
+                                    self.pool.close()
+                                    await 
asyncio.wait_for(self.pool.wait_closed(), timeout=3.0)
+                                self.logger.debug("Old pool closed 
successfully")
+                            except asyncio.TimeoutError:
+                                self.logger.warning("Pool close timeout, 
forcing cleanup")
+                            except Exception as e:
+                                self.logger.warning(f"Error closing old pool: 
{e}")
+                            finally:
+                                self.pool = None
+                        
+                        # Wait before creating new pool (reduced delay)
+                        if attempt > 0:
+                            await asyncio.sleep(2)  # Reduced from 5 to 2 
seconds
+                        
+                        # Recreate pool with timeout
+                        self.logger.debug("Creating new connection pool...")
+                        self.pool = await asyncio.wait_for(
+                            aiomysql.create_pool(
+                                host=self.host,
+                                port=self.port,
+                                user=self.user,
+                                password=self.password,
+                                db=self.database,
+                                charset=self.charset,
+                                minsize=self.minsize,
+                                maxsize=self.maxsize,
+                                pool_recycle=self.pool_recycle,
+                                connect_timeout=self.connect_timeout,
+                                autocommit=True
+                            ),
+                            timeout=10.0
+                        )
+                        
+                        # Test recovered pool with timeout
+                        if await asyncio.wait_for(self._test_pool_health(), 
timeout=5.0):
+                            self.logger.info(f"โœ… Pool recovery successful on 
attempt {attempt + 1}")
+                            # Re-warm the pool with timeout
+                            try:
+                                await asyncio.wait_for(self._warmup_pool(), 
timeout=5.0)
+                            except asyncio.TimeoutError:
+                                self.logger.warning("Pool warmup timeout, but 
recovery successful")
+                            return
+                        else:
+                            self.logger.warning(f"โŒ Pool recovery health check 
failed on attempt {attempt + 1}")
+                            
+                    except asyncio.TimeoutError:
+                        self.logger.error(f"Pool recovery attempt {attempt + 
1} timed out")
+                        if self.pool:
+                            try:
+                                self.pool.close()
+                            except:
+                                pass
+                            self.pool = None
+                    except Exception as e:
+                        self.logger.error(f"Pool recovery error on attempt 
{attempt + 1}: {e}")
+                        
+                        # Clean up failed pool
+                        if self.pool:
+                            try:
+                                self.pool.close()
+                                await 
asyncio.wait_for(self.pool.wait_closed(), timeout=2.0)
+                            except Exception:
+                                pass
+                            finally:
+                                self.pool = None
                 
-                # Close connection wrapper
-                await conn.close()
+                # All recovery attempts failed
+                self.logger.error("โŒ Pool recovery failed after all attempts")
+                self.pool = None
                 
-            except Exception as e:
-                self.logger.error(f"Error cleaning up connection for session 
{session_id}: {e}")
-                # Force close if normal cleanup fails
-                try:
-                    if conn.connection and not conn.connection.closed:
-                        await conn.connection.ensure_closed()
-                except Exception:
-                    pass  # Ignore errors during forced close
             finally:
-                # Remove from session connections
-                del self.session_connections[session_id]
-                self.logger.debug(f"Cleaned up connection for session: 
{session_id}")
-
-    async def _health_check_loop(self):
-        """Background health check loop"""
-        while True:
-            try:
-                await asyncio.sleep(self.health_check_interval)
-                await self._perform_health_check()
-            except asyncio.CancelledError:
-                break
-            except Exception as e:
-                self.logger.error(f"Health check error: {e}")
+                self.pool_recovering = False
 
-    async def _perform_health_check(self):
-        """Perform enhanced health check"""
+    async def get_connection(self, session_id: str) -> DorisConnection:
+        """Get database connection - Simplified Strategy with pool validation
+        
+        Always acquire fresh connection from pool, no session caching
+        """
         try:
-            unhealthy_sessions = []
-            
-            # Enhanced health check with comprehensive validation
-            for session_id, conn in self.session_connections.items():
-                if not await self._comprehensive_connection_health_check(conn):
-                    unhealthy_sessions.append(session_id)
-            
-            # Check for stale connections (over 30 minutes old)
-            current_time = datetime.utcnow()
-            stale_sessions = []
-            for session_id, conn in self.session_connections.items():
-                if session_id not in unhealthy_sessions:  # Don't double-check
-                    last_used_delta = (current_time - 
conn.last_used).total_seconds()
-                    if last_used_delta > 1800:  # 30 minutes
-                        # Force a comprehensive health check for stale 
connections
-                        if not await 
self._comprehensive_connection_health_check(conn):
-                            stale_sessions.append(session_id)
-            
-            all_problematic_sessions = list(set(unhealthy_sessions + 
stale_sessions))
-            
-            # Clean up problematic connections
-            for session_id in all_problematic_sessions:
-                await self._cleanup_session_connection(session_id)
-                self.metrics.failed_connections += 1
-            
-            # Update metrics
-            await self._update_connection_metrics()
-            self.metrics.last_health_check = datetime.utcnow()
-            
-            if all_problematic_sessions:
-                self.logger.warning(f"Health check: cleaned up 
{len(unhealthy_sessions)} unhealthy and {len(stale_sessions)} stale 
connections")
-            else:
-                self.logger.debug(f"Health check: all 
{len(self.session_connections)} connections healthy")
+            # Wait for any ongoing recovery to complete
+            if self.pool_recovering:
+                self.logger.debug(f"Pool recovery in progress, waiting for 
completion...")
+                # Wait for recovery to complete (max 10 seconds)
+                for _ in range(10):
+                    if not self.pool_recovering:
+                        break
+                    await asyncio.sleep(0.5)
+                
+                if self.pool_recovering:
+                    self.logger.error("Pool recovery is taking too long, 
proceeding anyway")
+                    # Don't raise error, try to continue
+            
+            # Check if pool is available
+            if not self.pool:
+                self.logger.warning("Connection pool is not available, 
attempting recovery...")
+                await self._recover_pool()
+                
+                if not self.pool:
+                    raise RuntimeError("Connection pool is not available and 
recovery failed")
+            
+            # Check if pool is closed
+            if self.pool.closed:
+                self.logger.warning("Connection pool is closed, attempting 
recovery...")
+                await self._recover_pool()
+                
+                if not self.pool or self.pool.closed:
+                    raise RuntimeError("Connection pool is closed and recovery 
failed")
+            
+            # Simple strategy: always get fresh connection from pool
+            raw_conn = await self.pool.acquire()
+            
+            # Wrap in DorisConnection
+            doris_conn = DorisConnection(raw_conn, session_id, 
self.security_manager)
+            
+            # Simple validation - just check if connection is open
+            if raw_conn.closed:
+                raise RuntimeError("Acquired connection is already closed")
+            
+            self.logger.debug(f"โœ… Acquired fresh connection for session 
{session_id}")
+            return doris_conn
+            
+        except Exception as e:
+            self.logger.error(f"Failed to get connection for session 
{session_id}: {e}")
+            raise
 
+    async def release_connection(self, session_id: str, connection: 
DorisConnection):
+        """Release connection back to pool - Simplified Strategy"""
+        try:
+            if connection and connection.connection:
+                # Simple strategy: always return to pool
+                if not connection.connection.closed:
+                    self.pool.release(connection.connection)
+                    self.logger.debug(f"โœ… Released connection for session 
{session_id}")
+                else:
+                    self.logger.debug(f"Connection already closed for session 
{session_id}")
+            
         except Exception as e:
-            self.logger.error(f"Health check failed: {e}")
-            # If health check fails, try to diagnose the issue
+            self.logger.error(f"Error releasing connection for session 
{session_id}: {e}")
+            # Force close if release fails
             try:
-                diagnosis = await self.diagnose_connection_health()
-                self.logger.error(f"Connection diagnosis: {diagnosis}")
+                if connection and connection.connection:
+                    await connection.connection.ensure_closed()
             except Exception:
-                pass  # Don't let diagnosis failure crash health check
-
-    async def _cleanup_loop(self):
-        """Background cleanup loop"""
-        while True:
-            try:
-                await asyncio.sleep(300)  # Run every 5 minutes
-                await self._cleanup_idle_connections()
-            except asyncio.CancelledError:
-                break
-            except Exception as e:
-                self.logger.error(f"Cleanup loop error: {e}")
-
-    async def _cleanup_idle_connections(self):
-        """Clean up idle connections"""
-        current_time = datetime.utcnow()
-        idle_sessions = []
-        
-        for session_id, conn in self.session_connections.items():
-            # Check if connection has exceeded maximum age
-            age = (current_time - conn.created_at).total_seconds()
-            if age > self.max_connection_age:
-                idle_sessions.append(session_id)
-        
-        # Clean up idle connections
-        for session_id in idle_sessions:
-            await self._cleanup_session_connection(session_id)
-        
-        if idle_sessions:
-            self.logger.info(f"Cleaned up {len(idle_sessions)} idle 
connections")
-
-    async def _update_connection_metrics(self):
-        """Update connection metrics"""
-        self.metrics.active_connections = len(self.session_connections)
-        if self.pool:
-            self.metrics.idle_connections = self.pool.freesize
-
-    async def get_metrics(self) -> ConnectionMetrics:
-        """Get connection metrics"""
-        await self._update_connection_metrics()
-        return self.metrics
-
-    async def execute_query(
-        self, session_id: str, sql: str, params: tuple | None = None, 
auth_context=None
-    ) -> QueryResult:
-        """Execute query with enhanced error handling and retry logic"""
-        max_retries = 2
-        for attempt in range(max_retries):
-            try:
-                conn = await self.get_connection(session_id)
-                return await conn.execute(sql, params, auth_context)
-            except Exception as e:
-                error_msg = str(e).lower()
-                # Check for connection-related errors that warrant retry
-                is_connection_error = any(keyword in error_msg for keyword in [
-                    'at_eof', 'connection', 'closed', 'nonetype', 'reader', 
'transport'
-                ])
-                
-                if is_connection_error and attempt < max_retries - 1:
-                    self.logger.warning(f"Connection error during query 
execution (attempt {attempt + 1}): {e}")
-                    # Clean up the problematic connection
-                    await self.release_connection(session_id)
-                    # Wait before retry
-                    await asyncio.sleep(0.5 * (attempt + 1))
-                    continue
-                else:
-                    # Not a connection error or final retry - re-raise
-                    raise
-
-    @asynccontextmanager
-    async def get_connection_context(self, session_id: str):
-        """Get connection context manager"""
-        conn = await self.get_connection(session_id)
-        try:
-            yield conn
-        finally:
-            # Connection will be reused, no need to close here
-            pass
+                pass
 
     async def close(self):
         """Close connection manager"""
         try:
             # Cancel background tasks
-            if self._health_check_task:
-                self._health_check_task.cancel()
+            if self.pool_health_check_task:
+                self.pool_health_check_task.cancel()
                 try:
-                    await self._health_check_task
+                    await self.pool_health_check_task
                 except asyncio.CancelledError:
                     pass
 
-            if self._cleanup_task:
-                self._cleanup_task.cancel()
+            if self.pool_cleanup_task:
+                self.pool_cleanup_task.cancel()
                 try:
-                    await self._cleanup_task
+                    await self.pool_cleanup_task
                 except asyncio.CancelledError:
                     pass
 
-            # Clean up all session connections
-            for session_id in list(self.session_connections.keys()):
-                await self._cleanup_session_connection(session_id)
-
             # Close connection pool
             if self.pool:
                 self.pool.close()
@@ -661,15 +606,62 @@ class DorisConnectionManager:
 
     async def test_connection(self) -> bool:
         """Test database connection using robust connection test"""
-        return await self._robust_connection_test()
+        return await self._test_pool_health()
+
+    async def get_metrics(self) -> ConnectionMetrics:
+        """Get connection pool metrics - Simplified Strategy"""
+        try:
+            if self.pool:
+                self.metrics.idle_connections = self.pool.freesize
+                self.metrics.active_connections = self.pool.size - 
self.pool.freesize
+            else:
+                self.metrics.idle_connections = 0
+                self.metrics.active_connections = 0
+            
+            return self.metrics
+        except Exception as e:
+            self.logger.error(f"Error getting metrics: {e}")
+            return self.metrics
+
+    async def execute_query(
+        self, session_id: str, sql: str, params: tuple | None = None, 
auth_context=None
+    ) -> QueryResult:
+        """Execute query - Simplified Strategy with automatic connection 
management"""
+        connection = None
+        try:
+            # Always get fresh connection from pool
+            connection = await self.get_connection(session_id)
+            
+            # Execute query
+            result = await connection.execute(sql, params, auth_context)
+            
+            return result
+            
+        except Exception as e:
+            self.logger.error(f"Query execution failed for session 
{session_id}: {e}")
+            raise
+        finally:
+            # Always release connection back to pool
+            if connection:
+                await self.release_connection(session_id, connection)
+
+    @asynccontextmanager
+    async def get_connection_context(self, session_id: str):
+        """Get connection context manager - Simplified Strategy"""
+        connection = None
+        try:
+            connection = await self.get_connection(session_id)
+            yield connection
+        finally:
+            if connection:
+                await self.release_connection(session_id, connection)
 
     async def diagnose_connection_health(self) -> Dict[str, Any]:
-        """Diagnose connection pool and session health"""
+        """Diagnose connection pool health - Simplified Strategy"""
         diagnosis = {
             "timestamp": datetime.utcnow().isoformat(),
             "pool_status": "unknown",
-            "session_connections": {},
-            "problematic_connections": [],
+            "pool_info": {},
             "recommendations": []
         }
         
@@ -693,55 +685,16 @@ class DorisConnectionManager:
                 "max_size": self.pool.maxsize
             }
             
-            # Check session connections
-            problematic_sessions = []
-            for session_id, conn in self.session_connections.items():
-                conn_status = {
-                    "session_id": session_id,
-                    "created_at": conn.created_at.isoformat(),
-                    "last_used": conn.last_used.isoformat(),
-                    "query_count": conn.query_count,
-                    "is_healthy": conn.is_healthy
-                }
-                
-                # Detailed connection checks
-                if conn.connection:
-                    conn_status["connection_closed"] = conn.connection.closed
-                    conn_status["has_reader"] = hasattr(conn.connection, 
'_reader') and conn.connection._reader is not None
-                    
-                    if hasattr(conn.connection, '_reader') and 
conn.connection._reader:
-                        conn_status["reader_transport"] = 
conn.connection._reader._transport is not None
-                    else:
-                        conn_status["reader_transport"] = False
-                else:
-                    conn_status["connection_closed"] = True
-                    conn_status["has_reader"] = False
-                    conn_status["reader_transport"] = False
-                
-                # Check if connection is problematic
-                if (not conn.is_healthy or 
-                    conn_status["connection_closed"] or 
-                    not conn_status["has_reader"] or 
-                    not conn_status["reader_transport"]):
-                    problematic_sessions.append(session_id)
-                    diagnosis["problematic_connections"].append(conn_status)
-                
-                diagnosis["session_connections"][session_id] = conn_status
-            
-            # Generate recommendations
-            if problematic_sessions:
-                diagnosis["recommendations"].append(f"Clean up 
{len(problematic_sessions)} problematic connections")
-            
+            # Generate recommendations based on pool status
             if self.pool.freesize == 0 and self.pool.size >= self.pool.maxsize:
                 diagnosis["recommendations"].append("Connection pool exhausted 
- consider increasing max_connections")
             
-            # Auto-cleanup problematic connections
-            for session_id in problematic_sessions:
-                try:
-                    await self._cleanup_session_connection(session_id)
-                    self.logger.info(f"Auto-cleaned problematic connection for 
session: {session_id}")
-                except Exception as e:
-                    self.logger.error(f"Failed to auto-clean session 
{session_id}: {e}")
+            # Test pool health
+            if await self._test_pool_health():
+                diagnosis["pool_health"] = "healthy"
+            else:
+                diagnosis["pool_health"] = "unhealthy"
+                diagnosis["recommendations"].append("Pool health check failed 
- may need recovery")
             
             return diagnosis
             
@@ -768,7 +721,8 @@ class ConnectionPoolMonitor:
         status = {
             "pool_size": self.connection_manager.pool.size if 
self.connection_manager.pool else 0,
             "free_connections": self.connection_manager.pool.freesize if 
self.connection_manager.pool else 0,
-            "active_sessions": 
len(self.connection_manager.session_connections),
+            "active_connections": metrics.active_connections,
+            "idle_connections": metrics.idle_connections,
             "total_connections": metrics.total_connections,
             "failed_connections": metrics.failed_connections,
             "connection_errors": metrics.connection_errors,
@@ -779,54 +733,33 @@ class ConnectionPoolMonitor:
         return status
 
     async def get_session_details(self) -> list[dict[str, Any]]:
-        """Get session connection details"""
-        sessions = []
-        
-        for session_id, conn in 
self.connection_manager.session_connections.items():
-            session_info = {
-                "session_id": session_id,
-                "created_at": conn.created_at.isoformat(),
-                "last_used": conn.last_used.isoformat(),
-                "query_count": conn.query_count,
-                "is_healthy": conn.is_healthy,
-                "connection_age": (datetime.utcnow() - 
conn.created_at).total_seconds(),
-            }
-            sessions.append(session_info)
-        
-        return sessions
+        """Get session connection details - Simplified Strategy (No session 
caching)"""
+        # In simplified strategy, we don't maintain session connections
+        # Return empty list as connections are managed by the pool directly
+        return []
 
     async def generate_health_report(self) -> dict[str, Any]:
-        """Generate connection health report"""
+        """Generate connection health report - Simplified Strategy"""
         pool_status = await self.get_pool_status()
-        session_details = await self.get_session_details()
         
-        # Calculate health statistics
-        healthy_sessions = sum(1 for s in session_details if s["is_healthy"])
-        total_sessions = len(session_details)
-        health_ratio = healthy_sessions / total_sessions if total_sessions > 0 
else 1.0
+        # Calculate pool utilization
+        pool_utilization = 1.0 - (pool_status["free_connections"] / 
pool_status["pool_size"]) if pool_status["pool_size"] > 0 else 0.0
         
         report = {
             "timestamp": datetime.utcnow().isoformat(),
             "pool_status": pool_status,
-            "session_summary": {
-                "total_sessions": total_sessions,
-                "healthy_sessions": healthy_sessions,
-                "health_ratio": health_ratio,
-            },
-            "session_details": session_details,
+            "pool_utilization": pool_utilization,
             "recommendations": [],
         }
         
-        # Add recommendations based on health status
-        if health_ratio < 0.8:
-            report["recommendations"].append("Consider checking database 
connectivity and network stability")
-        
+        # Add recommendations based on pool status
         if pool_status["connection_errors"] > 10:
             report["recommendations"].append("High connection error rate 
detected, review connection configuration")
         
-        if pool_status["active_sessions"] > pool_status["pool_size"] * 0.9:
+        if pool_utilization > 0.9:
             report["recommendations"].append("Connection pool utilization is 
high, consider increasing pool size")
         
-        return report
-
-
+        if pool_status["free_connections"] == 0:
+            report["recommendations"].append("No free connections available, 
consider increasing pool size")
+        
+        return report
\ No newline at end of file
diff --git a/doris_mcp_server/utils/schema_extractor.py 
b/doris_mcp_server/utils/schema_extractor.py
index fd711c8..db973e7 100644
--- a/doris_mcp_server/utils/schema_extractor.py
+++ b/doris_mcp_server/utils/schema_extractor.py
@@ -1215,33 +1215,39 @@ class MetadataExtractor:
         try:
             if self.connection_manager:
                 import asyncio
+                import concurrent.futures
+                import threading
                 
-                # Try to run the async query
-                try:
-                    # Check if there's a running event loop
-                    loop = asyncio.get_running_loop()
-                    # If we're in an async context, we need to run in a 
separate thread
-                    import concurrent.futures
-                    
-                    def run_in_new_loop():
-                        new_loop = asyncio.new_event_loop()
-                        asyncio.set_event_loop(new_loop)
+                # Always run in a separate thread with new event loop to avoid 
conflicts
+                def run_in_new_loop():
+                    # Create new event loop for this thread
+                    new_loop = asyncio.new_event_loop()
+                    asyncio.set_event_loop(new_loop)
+                    try:
+                        return new_loop.run_until_complete(
+                            self._execute_query_async(query, db_name, 
return_dataframe)
+                        )
+                    finally:
                         try:
-                            return new_loop.run_until_complete(
-                                self._execute_query_async(query, db_name, 
return_dataframe)
-                            )
+                            # Properly close the loop
+                            pending = asyncio.all_tasks(new_loop)
+                            if pending:
+                                
new_loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
                         finally:
                             new_loop.close()
-                    
-                    with concurrent.futures.ThreadPoolExecutor() as executor:
-                        future = executor.submit(run_in_new_loop)
+                
+                # Use ThreadPoolExecutor to run in separate thread
+                with concurrent.futures.ThreadPoolExecutor(max_workers=1) as 
executor:
+                    future = executor.submit(run_in_new_loop)
+                    try:
                         return future.result(timeout=30)
-                        
-                except RuntimeError:
-                    # No running loop, we can safely create one
-                    return asyncio.run(
-                        self._execute_query_async(query, db_name, 
return_dataframe)
-                    )
+                    except concurrent.futures.TimeoutError:
+                        logger.error("Query execution timed out after 30 
seconds")
+                        if return_dataframe:
+                            import pandas as pd
+                            return pd.DataFrame()
+                        else:
+                            return []
             else:
                 # Fallback: Return empty result
                 logger.warning("No connection manager provided, returning 
empty result")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to