This is an automated email from the ASF dual-hosted git repository.

chenliang613 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5eb09fb274 add agent module for AI-DATA (#4373)
5eb09fb274 is described below

commit 5eb09fb274d4603ca85a1c1d3a891a0c650462c0
Author: Liang Chen <[email protected]>
AuthorDate: Tue Nov 4 16:39:36 2025 +0800

    add agent module for AI-DATA (#4373)
---
 .DS_Store                          | Bin 6148 -> 6148 bytes
 Agent_module/Agent_Demo.python     | 189 +++++++++++++++++++++++++++++++++++++
 Agent_module/Chat_Agent.python     |  97 +++++++++++++++++++
 Agent_module/Framework.python      |  78 +++++++++++++++
 Agent_module/Implementation.python | 119 +++++++++++++++++++++++
 Agent_module/Manager.python        | 103 ++++++++++++++++++++
 6 files changed, 586 insertions(+)

diff --git a/.DS_Store b/.DS_Store
index 076251c989..fbf5a35595 100644
Binary files a/.DS_Store and b/.DS_Store differ
diff --git a/Agent_module/Agent_Demo.python b/Agent_module/Agent_Demo.python
new file mode 100644
index 0000000000..0cc21649a9
--- /dev/null
+++ b/Agent_module/Agent_Demo.python
@@ -0,0 +1,189 @@
+async def demo_single_agents():
+    """Demo individual agents"""
+    print("=== Single Agent Demo ===\n")
+    
+    # Create agents
+    echo_agent = EchoAgent()
+    calculator_agent = CalculatorAgent()
+    weather_agent = WeatherAgent()
+    translator_agent = TranslatorAgent()
+    chat_agent = ChatAgent()
+    
+    # Test echo agent
+    print("1. Testing Echo Agent:")
+    response = await echo_agent.process(AgentMessage(
+        content="Hello, world!", 
+        sender="User",
+        timestamp=time.time()
+    ))
+    print(f"Input: Hello, world!")
+    print(f"Output: {response.content}\n")
+    
+    # Test calculator agent
+    print("2. Testing Calculator Agent:")
+    response = await calculator_agent.process(AgentMessage(
+        content="(15 + 25) * 2", 
+        sender="User",
+        timestamp=time.time()
+    ))
+    print(f"Input: (15 + 25) * 2")
+    print(f"Output: {response.content}\n")
+    
+    # Test weather agent
+    print("3. Testing Weather Agent:")
+    response = await weather_agent.process(AgentMessage(
+        content="Tokyo", 
+        sender="User",
+        timestamp=time.time()
+    ))
+    print(f"Input: Tokyo")
+    print(f"Output: {response.content}\n")
+    
+    # Test translator agent
+    print("4. Testing Translator Agent:")
+    response = await translator_agent.process(AgentMessage(
+        content="Hello, how are you?", 
+        sender="User",
+        timestamp=time.time(),
+        metadata={"target_language": "french"}
+    ))
+    print(f"Input: Hello, how are you? (to French)")
+    print(f"Output: {response.content}\n")
+    
+    # Test chat agent
+    print("5. Testing Chat Agent:")
+    response = await chat_agent.process(AgentMessage(
+        content="Hello! What can you do?", 
+        sender="User",
+        timestamp=time.time()
+    ))
+    print(f"Input: Hello! What can you do?")
+    print(f"Output: {response.content}\n")
+
+async def demo_agent_system():
+    """Demo the complete agent system"""
+    print("\n=== Agent System Demo ===\n")
+    
+    # Create agent manager and register agents
+    manager = AgentManager()
+    
+    manager.register_agent(EchoAgent())
+    manager.register_agent(CalculatorAgent())
+    manager.register_agent(WeatherAgent())
+    manager.register_agent(TranslatorAgent())
+    manager.register_agent(ChatAgent())
+    
+    print(f"Registered {len(manager.agents)} agents:")
+    for agent_name in manager.agents.keys():
+        print(f"  - {agent_name}")
+    
+    # Test individual agent communication
+    print("\n1. Testing Calculator Agent through Manager:")
+    response = await manager.send_message("CalculatorAgent", "45 + 17 * 3")
+    print(f"Response: {response.content}")
+    
+    print("\n2. Testing Weather Agent through Manager:")
+    response = await manager.send_message("WeatherAgent", "London")
+    print(f"Response: {response.content}")
+    
+    print("\n3. Testing Broadcast Message:")
+    responses = await manager.broadcast_message("Hello everyone!")
+    print("Broadcast responses:")
+    for agent_name, response in responses.items():
+        print(f"  {agent_name}: {response.content[:50]}...")
+    
+    # Demonstrate agent discovery
+    print("\n4. Agent Discovery:")
+    math_agents = manager.find_agent_by_capability("calculation")
+    print(f"Agents for calculations: {math_agents}")
+    
+    weather_agents = manager.find_agent_by_capability("weather")
+    print(f"Agents for weather: {weather_agents}")
+    
+    # Show system statistics
+    print("\n5. System Statistics:")
+    stats = manager.get_system_stats()
+    for key, value in stats.items():
+        print(f"  {key}: {value}")
+
+async def interactive_demo():
+    """Interactive demo where users can talk to agents"""
+    print("\n=== Interactive Demo ===\n")
+    
+    manager = AgentManager()
+    manager.register_agent(ChatAgent())
+    manager.register_agent(CalculatorAgent())
+    manager.register_agent(WeatherAgent())
+    manager.register_agent(TranslatorAgent())
+    
+    print("Available agents: ChatAgent, CalculatorAgent, WeatherAgent, 
TranslatorAgent")
+    print("Type 'quit' to exit, 'help' for commands, 'agents' to list 
agents\n")
+    
+    while True:
+        try:
+            user_input = input("You: ").strip()
+            
+            if user_input.lower() in ['quit', 'exit']:
+                break
+            elif user_input.lower() == 'help':
+                print("\nAvailable commands:")
+                print("  'calc [expression]' - Use calculator")
+                print("  'weather [city]' - Get weather")
+                print("  'translate [text] to [language]' - Translate text")
+                print("  'agents' - List all agents")
+                print("  'stats' - Show system statistics")
+                print("  'quit' - Exit\n")
+                continue
+            elif user_input.lower() == 'agents':
+                print("\nRegistered agents:")
+                for name, agent in manager.agents.items():
+                    print(f"  {name}: {agent.description}")
+                print()
+                continue
+            elif user_input.lower() == 'stats':
+                stats = manager.get_system_stats()
+                print("\nSystem Statistics:")
+                for key, value in stats.items():
+                    print(f"  {key}: {value}")
+                print()
+                continue
+            
+            # Route to appropriate agent based on input
+            if user_input.startswith('calc '):
+                expression = user_input[5:]
+                response = await manager.send_message("CalculatorAgent", 
expression)
+                print(f"Calculator: {response.content}\n")
+            
+            elif user_input.startswith('weather '):
+                city = user_input[8:]
+                response = await manager.send_message("WeatherAgent", city)
+                print(f"Weather: {response.content}\n")
+            
+            elif user_input.startswith('translate '):
+                # Simple parsing for translation commands
+                parts = user_input[10:].split(' to ')
+                if len(parts) == 2:
+                    text, language = parts
+                    response = await manager.send_message(
+                        "TranslatorAgent", 
+                        text, 
+                        {"target_language": language.strip()}
+                    )
+                    print(f"Translator: {response.content}\n")
+                else:
+                    print("Usage: translate [text] to [language]\n")
+            
+            else:
+                # Default to chat agent
+                response = await manager.send_message("ChatAgent", user_input)
+                print(f"ChatAgent: {response.content}\n")
+                
+        except KeyboardInterrupt:
+            break
+        except Exception as e:
+            print(f"Error: {e}\n")
+
+if __name__ == "__main__":
+    # Run demos
+    asyncio.run(demo_single_agents())
+    asyncio.run(demo_agent_system())
\ No newline at end of file
diff --git a/Agent_module/Chat_Agent.python b/Agent_module/Chat_Agent.python
new file mode 100644
index 0000000000..2a4a45ccdf
--- /dev/null
+++ b/Agent_module/Chat_Agent.python
@@ -0,0 +1,97 @@
+class ChatAgent(BaseAgent):
+    """Intelligent chat agent with context awareness"""
+    
+    def __init__(self, name: str = "ChatAgent"):
+        super().__init__(name, "Intelligent conversational agent")
+        self.context = {}
+        self.personality_traits = [
+            "friendly", "helpful", "knowledgeable", "enthusiastic"
+        ]
+    
+    async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+        user_input = str(message.content).lower()
+        user_id = message.metadata.get("user_id", "unknown")
+        
+        # Update context
+        if user_id not in self.context:
+            self.context[user_id] = {
+                "conversation_count": 0,
+                "last_interaction": time.time(),
+                "topics": set()
+            }
+        
+        self.context[user_id]["conversation_count"] += 1
+        self.context[user_id]["last_interaction"] = time.time()
+        
+        # Simple intent recognition
+        response = await self._generate_response(user_input, user_id)
+        
+        return AgentMessage(
+            content=response,
+            sender=self.name,
+            timestamp=time.time(),
+            message_type="chat_response",
+            metadata={"user_context": self.context[user_id]}
+        )
+    
+    async def _generate_response(self, user_input: str, user_id: str) -> str:
+        """Generate context-aware responses"""
+        
+        # Greeting detection
+        if any(word in user_input for word in ["hello", "hi", "hey", 
"greetings"]):
+            return random.choice([
+                "Hello! How can I assist you today?",
+                "Hi there! What can I help you with?",
+                "Greetings! I'm here to help.",
+                "Hello! Nice to meet you."
+            ])
+        
+        # Question detection
+        elif "?" in user_input or any(word in user_input for word in ["what", 
"how", "when", "where", "why"]):
+            return await self._answer_question(user_input)
+        
+        # Weather inquiry
+        elif "weather" in user_input:
+            return "I can help with weather information! Please use the 
WeatherAgent for accurate weather data."
+        
+        # Calculation request
+        elif any(word in user_input for word in ["calculate", "math", 
"equation"]):
+            return "I can help with calculations! Try using the 
CalculatorAgent for mathematical operations."
+        
+        # Translation request
+        elif any(word in user_input for word in ["translate", "translation"]):
+            return "I can assist with translations! The TranslatorAgent 
specializes in language translation."
+        
+        # Default response
+        else:
+            return random.choice([
+                "That's interesting! Can you tell me more?",
+                "I understand. How can I help you with that?",
+                "Thanks for sharing! Is there anything specific you'd like to 
know?",
+                "I see. What would you like to do next?"
+            ])
+    
+    async def _answer_question(self, question: str) -> str:
+        """Answer general knowledge questions"""
+        question_lower = question.lower()
+        
+        if "time" in question_lower:
+            return f"The current time is {time.strftime('%H:%M:%S')}"
+        
+        elif "date" in question_lower:
+            return f"Today's date is {time.strftime('%Y-%m-%d')}"
+        
+        elif "name" in question_lower:
+            return "I'm ChatAgent, your friendly AI assistant!"
+        
+        elif "purpose" in question_lower or "what can you do" in 
question_lower:
+            return "I can chat with you, answer questions, and coordinate with 
other specialized agents for weather, calculations, translations, and more!"
+        
+        else:
+            responses = [
+                "That's a good question! Let me think about it...",
+                "I'm not entirely sure about that, but I'd be happy to help 
you find out!",
+                "Interesting question! Here's what I know about that topic...",
+                "I understand your curiosity about that subject."
+            ]
+            return random.choice(responses)
\ No newline at end of file
diff --git a/Agent_module/Framework.python b/Agent_module/Framework.python
new file mode 100644
index 0000000000..d7c5e6b902
--- /dev/null
+++ b/Agent_module/Framework.python
@@ -0,0 +1,78 @@
+import asyncio
+import time
+from abc import ABC, abstractmethod
+from typing import Any, Dict, List, Optional, Callable
+from dataclasses import dataclass
+from enum import Enum
+import random
+import json
+
+class AgentStatus(Enum):
+    IDLE = "idle"
+    PROCESSING = "processing"
+    ERROR = "error"
+    STOPPED = "stopped"
+
+@dataclass
+class AgentMessage:
+    content: Any
+    sender: str
+    timestamp: float
+    message_type: str = "text"
+    metadata: Dict[str, Any] = None
+
+    def __post_init__(self):
+        if self.metadata is None:
+            self.metadata = {}
+        self.timestamp = self.timestamp or time.time()
+
+class BaseAgent(ABC):
+    """Base class for all agents"""
+    
+    def __init__(self, name: str, description: str = ""):
+        self.name = name
+        self.description = description
+        self.status = AgentStatus.IDLE
+        self.message_history: List[AgentMessage] = []
+        self._callbacks: List[Callable] = []
+        
+    async def process(self, message: AgentMessage) -> AgentMessage:
+        """Process a message and return response"""
+        self.status = AgentStatus.PROCESSING
+        self.message_history.append(message)
+        
+        try:
+            result = await self._process_impl(message)
+            self.status = AgentStatus.IDLE
+            self._notify_callbacks(message, result)
+            return result
+        except Exception as e:
+            self.status = AgentStatus.ERROR
+            error_msg = AgentMessage(
+                content=f"Error: {str(e)}",
+                sender=self.name,
+                timestamp=time.time(),
+                message_type="error"
+            )
+            return error_msg
+    
+    @abstractmethod
+    async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+        """Agent-specific processing logic"""
+        pass
+    
+    def add_callback(self, callback: Callable):
+        """Add callback for message processing events"""
+        self._callbacks.append(callback)
+    
+    def _notify_callbacks(self, input_msg: AgentMessage, output_msg: 
AgentMessage):
+        """Notify all callbacks"""
+        for callback in self._callbacks:
+            try:
+                callback(input_msg, output_msg)
+            except Exception:
+                pass  # Don't break agent if callback fails
+    
+    def get_recent_messages(self, count: int = 10) -> List[AgentMessage]:
+        """Get recent message history"""
+        return self.message_history[-count:]
\ No newline at end of file
diff --git a/Agent_module/Implementation.python 
b/Agent_module/Implementation.python
new file mode 100644
index 0000000000..59a806ce50
--- /dev/null
+++ b/Agent_module/Implementation.python
@@ -0,0 +1,119 @@
+class EchoAgent(BaseAgent):
+    """Simple echo agent that repeats messages"""
+    
+    def __init__(self, name: str = "EchoAgent"):
+        super().__init__(name, "Echoes back received messages")
+    
+    async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+        await asyncio.sleep(0.1)  # Simulate processing time
+        
+        response_content = f"Echo: {message.content}"
+        
+        return AgentMessage(
+            content=response_content,
+            sender=self.name,
+            timestamp=time.time(),
+            message_type="echo_response"
+        )
+
+class CalculatorAgent(BaseAgent):
+    """Agent that performs mathematical calculations"""
+    
+    def __init__(self, name: str = "CalculatorAgent"):
+        super().__init__(name, "Performs mathematical calculations")
+    
+    async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+        try:
+            expression = str(message.content).strip()
+            # Security: Only allow basic math operations
+            allowed_chars = set('0123456789+-*/.() ')
+            if not all(c in allowed_chars for c in expression):
+                raise ValueError("Invalid characters in expression")
+            
+            result = eval(expression)  # In real code, use a safe eval or 
parser
+            
+            return AgentMessage(
+                content=f"{expression} = {result}",
+                sender=self.name,
+                timestamp=time.time(),
+                message_type="calculation_result"
+            )
+        except Exception as e:
+            return AgentMessage(
+                content=f"Calculation error: {str(e)}",
+                sender=self.name,
+                timestamp=time.time(),
+                message_type="error"
+            )
+
+class WeatherAgent(BaseAgent):
+    """Simulated weather information agent"""
+    
+    def __init__(self, name: str = "WeatherAgent"):
+        super().__init__(name, "Provides weather information")
+        self.weather_data = {
+            "New York": {"temp": 22, "condition": "Sunny", "humidity": 65},
+            "London": {"temp": 15, "condition": "Cloudy", "humidity": 80},
+            "Tokyo": {"temp": 18, "condition": "Rainy", "humidity": 75},
+            "Sydney": {"temp": 25, "condition": "Clear", "humidity": 60}
+        }
+    
+    async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+        location = str(message.content).strip().title()
+        
+        await asyncio.sleep(0.2)  # Simulate API call
+        
+        if location in self.weather_data:
+            weather = self.weather_data[location]
+            response = (
+                f"Weather in {location}:\n"
+                f"Temperature: {weather['temp']}°C\n"
+                f"Condition: {weather['condition']}\n"
+                f"Humidity: {weather['humidity']}%"
+            )
+        else:
+            # Generate random weather for unknown locations
+            temp = random.randint(-10, 35)
+            conditions = ["Sunny", "Cloudy", "Rainy", "Snowy"]
+            condition = random.choice(conditions)
+            humidity = random.randint(30, 95)
+            
+            response = (
+                f"Weather in {location} (simulated):\n"
+                f"Temperature: {temp}°C\n"
+                f"Condition: {condition}\n"
+                f"Humidity: {humidity}%"
+            )
+        
+        return AgentMessage(
+            content=response,
+            sender=self.name,
+            timestamp=time.time(),
+            message_type="weather_report"
+        )
+
+class TranslatorAgent(BaseAgent):
+    """Simple translation agent (simulated)"""
+    
+    def __init__(self, name: str = "TranslatorAgent"):
+        super().__init__(name, "Translates text between languages")
+        self.supported_languages = ["english", "spanish", "french", "german", 
"italian"]
+    
+    async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+        text = str(message.content)
+        target_language = message.metadata.get("target_language", 
"spanish").lower()
+        
+        await asyncio.sleep(0.3)  # Simulate translation processing
+        
+        if target_language not in self.supported_languages:
+            response = f"Sorry, I don't support {target_language}. Supported 
languages: {', '.join(self.supported_languages)}"
+        else:
+            # Simulated translation - in real world, this would call a 
translation API
+            response = f"[{target_language.upper()} TRANSLATION] {text} -> 
Translated text in {target_language}"
+        
+        return AgentMessage(
+            content=response,
+            sender=self.name,
+            timestamp=time.time(),
+            message_type="translation"
+        )
\ No newline at end of file
diff --git a/Agent_module/Manager.python b/Agent_module/Manager.python
new file mode 100644
index 0000000000..ff51910795
--- /dev/null
+++ b/Agent_module/Manager.python
@@ -0,0 +1,103 @@
+class AgentManager:
+    """Manages multiple agents and coordinates between them"""
+    
+    def __init__(self):
+        self.agents: Dict[str, BaseAgent] = {}
+        self.conversation_log: List[Dict] = []
+    
+    def register_agent(self, agent: BaseAgent):
+        """Register a new agent"""
+        self.agents[agent.name] = agent
+        
+        # Add logging callback to all agents
+        agent.add_callback(self._log_interaction)
+    
+    async def send_message(self, agent_name: str, content: Any, 
+                         metadata: Dict[str, Any] = None) -> AgentMessage:
+        """Send message to specific agent"""
+        if agent_name not in self.agents:
+            raise ValueError(f"Agent '{agent_name}' not found")
+        
+        message = AgentMessage(
+            content=content,
+            sender="User",
+            timestamp=time.time(),
+            metadata=metadata or {}
+        )
+        
+        agent = self.agents[agent_name]
+        response = await agent.process(message)
+        
+        return response
+    
+    async def broadcast_message(self, content: Any, 
+                              metadata: Dict[str, Any] = None) -> Dict[str, 
AgentMessage]:
+        """Send message to all agents and collect responses"""
+        message = AgentMessage(
+            content=content,
+            sender="Broadcast",
+            timestamp=time.time(),
+            metadata=metadata or {}
+        )
+        
+        tasks = []
+        for agent_name, agent in self.agents.items():
+            tasks.append(agent.process(message))
+        
+        responses = await asyncio.gather(*tasks, return_exceptions=True)
+        
+        results = {}
+        for agent_name, response in zip(self.agents.keys(), responses):
+            if isinstance(response, Exception):
+                results[agent_name] = AgentMessage(
+                    content=f"Error: {str(response)}",
+                    sender=agent_name,
+                    timestamp=time.time(),
+                    message_type="error"
+                )
+            else:
+                results[agent_name] = response
+        
+        return results
+    
+    def find_agent_by_capability(self, keyword: str) -> List[str]:
+        """Find agents that might handle a specific capability"""
+        matching_agents = []
+        for agent_name, agent in self.agents.items():
+            if (keyword.lower() in agent.name.lower() or 
+                keyword.lower() in agent.description.lower()):
+                matching_agents.append(agent_name)
+        return matching_agents
+    
+    def _log_interaction(self, input_msg: AgentMessage, output_msg: 
AgentMessage):
+        """Log agent interactions"""
+        log_entry = {
+            "timestamp": time.time(),
+            "input": {
+                "content": input_msg.content,
+                "sender": input_msg.sender,
+                "type": input_msg.message_type
+            },
+            "output": {
+                "content": output_msg.content,
+                "sender": output_msg.sender,
+                "type": output_msg.message_type
+            },
+            "processing_time": output_msg.timestamp - input_msg.timestamp
+        }
+        self.conversation_log.append(log_entry)
+    
+    def get_agent_statuses(self) -> Dict[str, str]:
+        """Get status of all agents"""
+        return {name: agent.status.value for name, agent in 
self.agents.items()}
+    
+    def get_system_stats(self) -> Dict[str, Any]:
+        """Get system statistics"""
+        total_messages = sum(len(agent.message_history) for agent in 
self.agents.values())
+        
+        return {
+            "total_agents": len(self.agents),
+            "total_messages_processed": total_messages,
+            "conversation_log_entries": len(self.conversation_log),
+            "agent_statuses": self.get_agent_statuses()
+        }
\ No newline at end of file

Reply via email to