This is an automated email from the ASF dual-hosted git repository.

freeoneplus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-mcp-server.git


The following commit(s) were added to refs/heads/master by this push:
     new c1e3b13  [Performance]Optimizing concurrent startup capabilities (#48)
c1e3b13 is described below

commit c1e3b1385150a64784fc9028ace7f3ffae7e1e91
Author: Yijia Su <[email protected]>
AuthorDate: Tue Sep 2 13:39:05 2025 +0800

    [Performance]Optimizing concurrent startup capabilities (#48)
    
    * 0.5.1 Version
    
    * fix 0.5.1 schema async bug
    
    * fix security bug
    
    * fix security bug
---
 doris_mcp_server/main.py            |  74 ++++--
 doris_mcp_server/multiworker_app.py | 509 ++++++++++++++++++++++++++++++++++++
 start_server.sh                     |   4 +-
 3 files changed, 568 insertions(+), 19 deletions(-)

diff --git a/doris_mcp_server/main.py b/doris_mcp_server/main.py
index e6f9f70..ccf0c29 100644
--- a/doris_mcp_server/main.py
+++ b/doris_mcp_server/main.py
@@ -221,6 +221,8 @@ logger = logging.getLogger(__name__)
 _default_config = DorisConfig()
 
 
+
+
 class DorisServer:
     """Apache Doris MCP Server main class"""
 
@@ -449,9 +451,9 @@ class DorisServer:
 
 
 
-    async def start_http(self, host: str = os.getenv("SERVER_HOST", 
_default_config.database.host), port: int = os.getenv("SERVER_PORT", 
_default_config.server_port)):
-        """Start Streamable HTTP transport mode"""
-        self.logger.info(f"Starting Doris MCP Server (Streamable HTTP mode) - 
{host}:{port}")
+    async def start_http(self, host: str = os.getenv("SERVER_HOST", 
_default_config.database.host), port: int = os.getenv("SERVER_PORT", 
_default_config.server_port), workers: int = 1):
+        """Start Streamable HTTP transport mode with workers support"""
+        self.logger.info(f"Starting Doris MCP Server (Streamable HTTP mode) - 
{host}:{port}, workers: {workers}")
 
         try:
             # Ensure connection manager is initialized
@@ -568,19 +570,35 @@ class DorisServer:
                     self.logger.warning(f"Unsupported scope type: 
{scope['type']}")
                     return
             
-            # Start uvicorn server with session manager lifecycle
-            config = uvicorn.Config(
-                app=mcp_app,
-                host=host,
-                port=port,
-                log_level="info"
-            )
-            server = uvicorn.Server(config)
-            
-            # Run session manager and server together
-            async with session_manager.run():
-                self.logger.info("Session manager started, now starting HTTP 
server")
-                await server.serve()
+            # Choose startup method based on worker count
+            if workers > 1:
+                self.logger.info(f"Using multi-process mode with {workers} 
workers")
+                self.logger.info("Note: Multi-worker mode provides full MCP 
functionality with independent worker processes")
+                
+                # Use the dedicated multiworker app module with full MCP 
support
+                uvicorn.run(
+                    "doris_mcp_server.multiworker_app:app",
+                    host=host,
+                    port=port,
+                    workers=workers,
+                    log_level="info"
+                )
+                
+            else:
+                self.logger.info("Using single-process mode")
+                # Single worker mode, use original logic with session manager 
lifecycle
+                config = uvicorn.Config(
+                    app=mcp_app,
+                    host=host,
+                    port=port,
+                    log_level="info"
+                )
+                server = uvicorn.Server(config)
+                
+                # Run session manager and server together
+                async with session_manager.run():
+                    self.logger.info("Session manager started, now starting 
HTTP server")
+                    await server.serve()
 
         except Exception as e:
             self.logger.error(f"Streamable HTTP server startup failed: {e}")
@@ -595,6 +613,8 @@ class DorisServer:
                     self.logger.error(f"  Exception {i+1}: 
{type(exc).__name__}: {exc}")
             raise
 
+
+
     async def shutdown(self):
         """Shutdown server"""
         self.logger.info("Shutting down Doris MCP Server")
@@ -648,6 +668,13 @@ Examples:
         help=f"Port number for HTTP mode (default: 
{_default_config.server_port})"
     )
 
+    parser.add_argument(
+        "--workers",
+        type=int,
+        default=int(os.getenv("WORKERS", "1")),
+        help="Number of worker processes for HTTP mode (default: 1, use 0 for 
auto-detect CPU cores)"
+    )
+
     parser.add_argument(
         "--doris-host", "--db-host",
         type=str,
@@ -720,6 +747,10 @@ def update_configuration(config: DorisConfig):
     # logging
     if args.log_level != _default_config.logging.level:
         config.logging.level = args.log_level
+    
+    # workers (add to config for HTTP mode)
+    if hasattr(args, 'workers'):
+        config.workers = args.workers
 
 
 async def main():
@@ -754,9 +785,16 @@ async def main():
         if config.transport == "stdio":
             await server.start_stdio()
         elif config.transport == "http":
-            await server.start_http(config.server_host, config.server_port)
+            # Get workers configuration with auto-detection support
+            workers = getattr(config, 'workers', 1)
+            if workers == 0:
+                import multiprocessing
+                workers = multiprocessing.cpu_count()
+                logger.info(f"Auto-detected {workers} CPU cores for worker 
processes")
+            
+            await server.start_http(config.server_host, config.server_port, 
workers)
         else:
-            logger.error(f"Unsupported transport protocol: {args.transport}")
+            logger.error(f"Unsupported transport protocol: {config.transport}")
             await server.shutdown()
             return 1
 
diff --git a/doris_mcp_server/multiworker_app.py 
b/doris_mcp_server/multiworker_app.py
new file mode 100644
index 0000000..9bf7c01
--- /dev/null
+++ b/doris_mcp_server/multiworker_app.py
@@ -0,0 +1,509 @@
+#!/usr/bin/env python3
+"""
+Multi-worker application module for doris-mcp-server
+
+This module provides full MCP functionality with multi-worker support.
+Each worker process creates its own MCP server and session manager using the 
same
+robust architecture as the single-worker mode.
+"""
+
+import os
+import asyncio
+from contextlib import asynccontextmanager
+import json
+import logging
+from typing import Any
+
+# Import MCP components with compatibility handling
+# Use the same import strategy as main.py for consistency
+MCP_VERSION = 'unknown'
+Server = None
+InitializationOptions = None
+Prompt = None
+Resource = None
+TextContent = None
+Tool = None
+
+def _import_mcp_with_compatibility():
+    """Import MCP components with multi-version compatibility"""
+    global MCP_VERSION, Server, InitializationOptions, Prompt, Resource, 
TextContent, Tool
+    
+    try:
+        # Strategy 1: Try direct server-only imports to avoid client-side 
issues
+        from mcp.server import Server as _Server
+        from mcp.server.models import InitializationOptions as _InitOptions
+        from mcp.types import (
+            Prompt as _Prompt,
+            Resource as _Resource, 
+            TextContent as _TextContent,
+            Tool as _Tool,
+        )
+        
+        # Assign to globals
+        Server = _Server
+        InitializationOptions = _InitOptions
+        Prompt = _Prompt
+        Resource = _Resource
+        TextContent = _TextContent
+        Tool = _Tool
+        
+        # Try to get version safely
+        try:
+            import mcp
+            MCP_VERSION = getattr(mcp, '__version__', None)
+            if not MCP_VERSION:
+                # Fallback: try to get version from package metadata
+                try:
+                    import importlib.metadata
+                    MCP_VERSION = importlib.metadata.version('mcp')
+                except Exception:
+                    # Second fallback: try pkg_resources
+                    try:
+                        import pkg_resources
+                        MCP_VERSION = 
pkg_resources.get_distribution('mcp').version
+                    except Exception:
+                        MCP_VERSION = 'detected-but-version-unknown'
+        except Exception:
+            # Version detection failed, but imports worked
+            try:
+                import importlib.metadata
+                MCP_VERSION = importlib.metadata.version('mcp')
+            except Exception:
+                try:
+                    import pkg_resources
+                    MCP_VERSION = pkg_resources.get_distribution('mcp').version
+                except Exception:
+                    MCP_VERSION = 'imported-successfully'
+            
+        logger = logging.getLogger(__name__)
+        logger.info(f"MCP components imported successfully in multiworker, 
version: {MCP_VERSION}")
+        return True
+        
+    except Exception as import_error:
+        logger = logging.getLogger(__name__)
+        
+        # Strategy 2: Handle RequestContext compatibility issues in 1.9.x 
versions
+        error_str = str(import_error).lower()
+        if 'requestcontext' in error_str and 'too few arguments' in error_str:
+            logger.warning(f"Detected MCP RequestContext compatibility issue: 
{import_error}")
+            logger.info("Attempting comprehensive workaround for MCP 1.9.x 
RequestContext issue...")
+            
+            try:
+                # Comprehensive monkey patch approach
+                import sys
+                import types
+                
+                # Create and install mock modules before any MCP imports
+                if 'mcp.shared.context' not in sys.modules:
+                    mock_context_module = 
types.ModuleType('mcp.shared.context')
+                    
+                    class FlexibleRequestContext:
+                        """Flexible RequestContext that accepts variable 
arguments"""
+                        def __init__(self, *args, **kwargs):
+                            self.args = args
+                            self.kwargs = kwargs
+                        
+                        def __class_getitem__(cls, params):
+                            # Accept any number of parameters and return cls
+                            return cls
+                        
+                        # Add other methods that might be called
+                        def __getattr__(self, name):
+                            return lambda *args, **kwargs: None
+                    
+                    mock_context_module.RequestContext = FlexibleRequestContext
+                    sys.modules['mcp.shared.context'] = mock_context_module
+                
+                # Also patch the typing system to be more permissive  
+                original_check_generic = None
+                try:
+                    import typing
+                    if hasattr(typing, '_check_generic'):
+                        original_check_generic = typing._check_generic
+                        def permissive_check_generic(cls, params, elen):
+                            # Don't enforce strict parameter count checking
+                            return
+                        typing._check_generic = permissive_check_generic
+                except Exception:
+                    pass
+                
+                # Clear any cached imports that might have failed
+                modules_to_clear = [k for k in sys.modules.keys() if 
k.startswith('mcp.')]
+                for module in modules_to_clear:
+                    if module in sys.modules:
+                        del sys.modules[module]
+                
+                # Now try importing again with the patches in place
+                from mcp.server import Server as _Server
+                from mcp.server.models import InitializationOptions as 
_InitOptions
+                from mcp.types import (
+                    Prompt as _Prompt,
+                    Resource as _Resource, 
+                    TextContent as _TextContent,
+                    Tool as _Tool,
+                )
+                
+                # Assign to globals
+                Server = _Server
+                InitializationOptions = _InitOptions
+                Prompt = _Prompt
+                Resource = _Resource
+                TextContent = _TextContent
+                Tool = _Tool
+                
+                # Try to detect actual version even in compatibility mode
+                try:
+                    import importlib.metadata
+                    actual_version = importlib.metadata.version('mcp')
+                    MCP_VERSION = f'compatibility-mode-{actual_version}'
+                except Exception:
+                    try:
+                        import pkg_resources
+                        actual_version = 
pkg_resources.get_distribution('mcp').version
+                        MCP_VERSION = f'compatibility-mode-{actual_version}'
+                    except Exception:
+                        MCP_VERSION = 'compatibility-mode-1.9.x'
+                
+                logger.info("MCP 1.9.x compatibility workaround successful in 
multiworker!")
+                
+                # Restore original typing function if we patched it
+                if original_check_generic:
+                    typing._check_generic = original_check_generic
+                
+                return True
+                
+            except Exception as workaround_error:
+                logger.error(f"MCP compatibility workaround failed in 
multiworker: {workaround_error}")
+                
+                # Restore original typing function if we patched it
+                if original_check_generic:
+                    try:
+                        import typing
+                        typing._check_generic = original_check_generic
+                    except Exception:
+                        pass
+        
+        logger.error(f"Failed to import MCP components in multiworker: 
{import_error}")
+        return False
+
+# Perform MCP import with compatibility handling
+if not _import_mcp_with_compatibility():
+    raise ImportError(
+        "Failed to import MCP components in multiworker. Please ensure MCP is 
properly installed. "
+        "Supported versions: 1.8.x, 1.9.x"
+    )
+
+from starlette.applications import Starlette
+from starlette.routing import Route
+from starlette.responses import JSONResponse, Response
+
+# Import Doris MCP components
+from .tools.tools_manager import DorisToolsManager
+from .tools.prompts_manager import DorisPromptsManager
+from .tools.resources_manager import DorisResourcesManager
+from .utils.config import DorisConfig
+from .utils.db import DorisConnectionManager
+from .utils.security import DorisSecurityManager
+
+# Global variables for worker-specific instances
+_worker_server = None
+_worker_session_manager = None
+_worker_connection_manager = None
+_worker_session_manager_context = None
+_worker_initialized = False
+
+def get_mcp_capabilities():
+    """Get MCP capabilities for worker - use the same logic as main.py"""
+    try:
+        # For MCP 1.9.x and newer
+        from mcp.server.lowlevel.server import NotificationOptions
+        
+        capabilities = {
+            "resources": {},
+            "tools": {},
+            "prompts": {},
+            "notification_options": {
+                "prompts_changed": True,
+                "resources_changed": True,
+                "tools_changed": True
+            }
+        }
+        return capabilities
+    except Exception as e:
+        # Import logger properly
+        from .utils.logger import get_logger
+        logger = get_logger(__name__)
+        logger.warning(f"Failed to get full capabilities in multiworker: {e}")
+        return {
+            "resources": {},
+            "tools": {},
+            "prompts": {}
+        }
+
+async def initialize_worker():
+    """Initialize MCP server and managers for this worker process"""
+    global _worker_server, _worker_session_manager, 
_worker_connection_manager, _worker_session_manager_context, _worker_initialized
+    
+    if _worker_initialized:
+        return
+    
+    try:
+        # Import logger properly
+        from .utils.logger import get_logger
+        logger = get_logger(__name__)
+        
+        logger.info(f"Initializing MCP worker process {os.getpid()}")
+        
+        # Create configuration
+        config = DorisConfig.from_env()
+        
+        # Initialize enhanced logging system
+        from .utils.config import ConfigManager
+        config_manager = ConfigManager(config)
+        config_manager.setup_logging()
+        
+        # Create security manager
+        security_manager = DorisSecurityManager(config)
+        
+        # Create connection manager
+        _worker_connection_manager = DorisConnectionManager(config, 
security_manager)
+        await _worker_connection_manager.initialize()
+        
+        # Create MCP server
+        _worker_server = Server("doris-mcp-server")
+        
+        # Create managers
+        resources_manager = DorisResourcesManager(_worker_connection_manager)
+        tools_manager = DorisToolsManager(_worker_connection_manager)
+        prompts_manager = DorisPromptsManager(_worker_connection_manager)
+        
+        # Setup MCP handlers
+        @_worker_server.list_resources()
+        async def handle_list_resources() -> list[Resource]:
+            """Handle resource list request"""
+            try:
+                logger.info("Handling resource list request in worker")
+                resources = await resources_manager.list_resources()
+                logger.info(f"Returning {len(resources)} resources from 
worker")
+                return resources
+            except Exception as e:
+                logger.error(f"Failed to handle resource list request in 
worker: {e}")
+                return []
+
+        @_worker_server.read_resource()
+        async def handle_read_resource(uri: str) -> str:
+            """Handle resource read request"""
+            try:
+                logger.info(f"Handling resource read request in worker: {uri}")
+                content = await resources_manager.read_resource(uri)
+                return content
+            except Exception as e:
+                logger.error(f"Failed to handle resource read request in 
worker: {e}")
+                return json.dumps(
+                    {"error": f"Failed to read resource: {str(e)}", "uri": 
uri},
+                    ensure_ascii=False,
+                    indent=2,
+                )
+
+        @_worker_server.list_tools()
+        async def handle_list_tools() -> list[Tool]:
+            """Handle tool list request"""
+            try:
+                logger.info("Handling tool list request in worker")
+                tools = await tools_manager.list_tools()
+                logger.info(f"Returning {len(tools)} tools from worker")
+                return tools
+            except Exception as e:
+                logger.error(f"Failed to handle tool list request in worker: 
{e}")
+                return []
+
+        @_worker_server.call_tool()
+        async def handle_call_tool(name: str, arguments: dict[str, Any]) -> 
list[TextContent]:
+            """Handle tool call request"""
+            try:
+                logger.info(f"Handling tool call request in worker: {name}")
+                result = await tools_manager.call_tool(name, arguments)
+                return [TextContent(type="text", text=result)]
+            except Exception as e:
+                logger.error(f"Failed to handle tool call request in worker: 
{e}")
+                error_result = json.dumps(
+                    {
+                        "error": f"Tool call failed: {str(e)}",
+                        "tool_name": name,
+                        "arguments": arguments,
+                    },
+                    ensure_ascii=False,
+                    indent=2,
+                )
+                return [TextContent(type="text", text=error_result)]
+
+        @_worker_server.list_prompts()
+        async def handle_list_prompts() -> list[Prompt]:
+            """Handle prompt list request"""
+            try:
+                logger.info("Handling prompt list request in worker")
+                prompts = await prompts_manager.list_prompts()
+                logger.info(f"Returning {len(prompts)} prompts from worker")
+                return prompts
+            except Exception as e:
+                logger.error(f"Failed to handle prompt list request in worker: 
{e}")
+                return []
+
+        @_worker_server.get_prompt()
+        async def handle_get_prompt(name: str, arguments: dict[str, Any]) -> 
str:
+            """Handle prompt get request"""
+            try:
+                logger.info(f"Handling prompt get request in worker: {name}")
+                result = await prompts_manager.get_prompt(name, arguments)
+                return result
+            except Exception as e:
+                logger.error(f"Failed to handle prompt get request in worker: 
{e}")
+                error_result = json.dumps(
+                    {
+                        "error": f"Failed to get prompt: {str(e)}",
+                        "prompt_name": name,
+                        "arguments": arguments,
+                    },
+                    ensure_ascii=False,
+                    indent=2,
+                )
+                return error_result
+        
+        # Create session manager for this worker
+        from mcp.server.streamable_http_manager import 
StreamableHTTPSessionManager
+        
+        _worker_session_manager = StreamableHTTPSessionManager(
+            app=_worker_server,
+            json_response=True,
+            stateless=True  # Use stateless mode for multi-worker compatibility
+        )
+        
+        # Start the session manager context
+        _worker_session_manager_context = _worker_session_manager.run()
+        await _worker_session_manager_context.__aenter__()
+        
+        _worker_initialized = True
+        logger.info(f"Worker {os.getpid()} MCP initialization completed 
successfully")
+        
+    except Exception as e:
+        from .utils.logger import get_logger
+        logger = get_logger(__name__)
+        logger.error(f"Failed to initialize worker {os.getpid()}: {e}")
+        import traceback
+        logger.error("Complete error stack:")
+        logger.error(traceback.format_exc())
+        raise
+
+async def health_check(request):
+    """Health check endpoint that shows worker PID"""
+    return JSONResponse({
+        "status": "healthy",
+        "service": "doris-mcp-server",
+        "worker_pid": os.getpid(),
+        "worker_mode": "multi-process-full-mcp",
+        "mcp_initialized": _worker_initialized,
+        "mcp_version": MCP_VERSION
+    })
+
+async def root_info(request):
+    """Root endpoint"""
+    return JSONResponse({
+        "service": "doris-mcp-server",
+        "mode": "multi-worker-full-mcp",
+        "worker_pid": os.getpid(),
+        "mcp_initialized": _worker_initialized,
+        "mcp_version": MCP_VERSION,
+        "endpoints": {
+            "health": "/health",
+            "mcp": "/mcp"
+        }
+    })
+
+@asynccontextmanager
+async def lifespan(app):
+    """Application lifespan manager"""
+    # Startup
+    try:
+        await initialize_worker()
+        # Import logger properly
+        from .utils.logger import get_logger
+        logger = get_logger(__name__)
+        logger.info(f"Worker {os.getpid()} startup completed")
+        
+        yield
+        
+    finally:
+        # Shutdown
+        from .utils.logger import get_logger
+        logger = get_logger(__name__)
+        
+        # Close session manager context
+        if _worker_session_manager_context:
+            try:
+                await _worker_session_manager_context.__aexit__(None, None, 
None)
+                logger.info(f"Worker {os.getpid()} session manager context 
closed")
+            except Exception as e:
+                logger.error(f"Error closing worker session manager context: 
{e}")
+        
+        if _worker_connection_manager:
+            try:
+                await _worker_connection_manager.close()
+                logger.info(f"Worker {os.getpid()} connection manager closed")
+            except Exception as e:
+                logger.error(f"Error closing worker connection manager: {e}")
+        
+        # Shutdown logging system
+        try:
+            from .utils.logger import shutdown_logging
+            shutdown_logging()
+        except Exception as e:
+            logger.error(f"Error shutting down logging system: {e}")
+
+async def mcp_asgi_app(scope, receive, send):
+    """ASGI app that handles MCP requests"""
+    if not _worker_initialized:
+        # Send error response if worker not initialized
+        await send({
+            'type': 'http.response.start',
+            'status': 503,
+            'headers': [(b'content-type', b'application/json')]
+        })
+        await send({
+            'type': 'http.response.body',
+            'body': b'{"error": "Worker not initialized"}'
+        })
+        return
+    
+    # Import logger properly
+    from .utils.logger import get_logger
+    logger = get_logger(__name__)
+    
+    # Get request path for logging
+    path = scope.get('path', '')
+    method = scope.get('method', 'UNKNOWN')
+    logger.debug(f"Worker {os.getpid()} handling MCP request: {method} {path}")
+    
+    # Handle the request directly without nested run context
+    await _worker_session_manager.handle_request(scope, receive, send)
+
+# Create Starlette app with basic routes
+basic_app = Starlette(
+    debug=True,
+    routes=[
+        Route("/", root_info, methods=["GET"]),
+        Route("/health", health_check, methods=["GET"]),
+    ],
+    lifespan=lifespan
+)
+
+# Create main ASGI app that routes between basic app and MCP
+async def app(scope, receive, send):
+    """Main ASGI app that routes requests"""
+    path = scope.get('path', '/')
+    
+    if path == "/mcp" or path.startswith('/mcp/'):
+        # Handle MCP requests with session manager
+        await mcp_asgi_app(scope, receive, send)
+    else:
+        # Handle other requests with basic Starlette app
+        await basic_app(scope, receive, send)
diff --git a/start_server.sh b/start_server.sh
index 3dfddfc..18284af 100755
--- a/start_server.sh
+++ b/start_server.sh
@@ -67,6 +67,7 @@ fi
 export MCP_TRANSPORT_TYPE="http"
 export MCP_HOST="${MCP_HOST:-0.0.0.0}"
 export MCP_PORT="${MCP_PORT:-3000}"
+export WORKERS="${WORKERS:-1}"
 export ALLOWED_ORIGINS="${ALLOWED_ORIGINS:-*}"
 export LOG_LEVEL="${LOG_LEVEL:-info}"
 export MCP_ALLOW_CREDENTIALS="${MCP_ALLOW_CREDENTIALS:-false}"
@@ -80,10 +81,11 @@ echo -e "${YELLOW}Service will run on 
http://${MCP_HOST}:${MCP_PORT}/mcp${NC}";
 echo -e "${YELLOW}Health Check: http://${MCP_HOST}:${MCP_PORT}/health${NC}";
 echo -e "${YELLOW}MCP Endpoint: http://${MCP_HOST}:${MCP_PORT}/mcp${NC}";
 echo -e "${YELLOW}Local access: http://localhost:${MCP_PORT}/mcp${NC}";
+echo -e "${YELLOW}Workers: ${WORKERS}${NC}"
 echo -e "${YELLOW}Use Ctrl+C to stop the service${NC}"
 
 # Start the server in HTTP mode (Streamable HTTP)
-python -m doris_mcp_server.main --transport http --host ${MCP_HOST} --port 
${MCP_PORT}
+python -m doris_mcp_server.main --transport http --host ${MCP_HOST} --port 
${MCP_PORT} --workers ${WORKERS}
 
 # Check exit status
 if [ $? -ne 0 ]; then


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to