This is an automated email from the ASF dual-hosted git repository.
chenliang613 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 5eb09fb274 add agent module for AI-DATA (#4373)
5eb09fb274 is described below
commit 5eb09fb274d4603ca85a1c1d3a891a0c650462c0
Author: Liang Chen <[email protected]>
AuthorDate: Tue Nov 4 16:39:36 2025 +0800
add agent module for AI-DATA (#4373)
---
.DS_Store | Bin 6148 -> 6148 bytes
Agent_module/Agent_Demo.python | 189 +++++++++++++++++++++++++++++++++++++
Agent_module/Chat_Agent.python | 97 +++++++++++++++++++
Agent_module/Framework.python | 78 +++++++++++++++
Agent_module/Implementation.python | 119 +++++++++++++++++++++++
Agent_module/Manager.python | 103 ++++++++++++++++++++
6 files changed, 586 insertions(+)
diff --git a/.DS_Store b/.DS_Store
index 076251c989..fbf5a35595 100644
Binary files a/.DS_Store and b/.DS_Store differ
diff --git a/Agent_module/Agent_Demo.python b/Agent_module/Agent_Demo.python
new file mode 100644
index 0000000000..0cc21649a9
--- /dev/null
+++ b/Agent_module/Agent_Demo.python
@@ -0,0 +1,189 @@
+async def demo_single_agents():
+ """Demo individual agents"""
+ print("=== Single Agent Demo ===\n")
+
+ # Create agents
+ echo_agent = EchoAgent()
+ calculator_agent = CalculatorAgent()
+ weather_agent = WeatherAgent()
+ translator_agent = TranslatorAgent()
+ chat_agent = ChatAgent()
+
+ # Test echo agent
+ print("1. Testing Echo Agent:")
+ response = await echo_agent.process(AgentMessage(
+ content="Hello, world!",
+ sender="User",
+ timestamp=time.time()
+ ))
+ print(f"Input: Hello, world!")
+ print(f"Output: {response.content}\n")
+
+ # Test calculator agent
+ print("2. Testing Calculator Agent:")
+ response = await calculator_agent.process(AgentMessage(
+ content="(15 + 25) * 2",
+ sender="User",
+ timestamp=time.time()
+ ))
+ print(f"Input: (15 + 25) * 2")
+ print(f"Output: {response.content}\n")
+
+ # Test weather agent
+ print("3. Testing Weather Agent:")
+ response = await weather_agent.process(AgentMessage(
+ content="Tokyo",
+ sender="User",
+ timestamp=time.time()
+ ))
+ print(f"Input: Tokyo")
+ print(f"Output: {response.content}\n")
+
+ # Test translator agent
+ print("4. Testing Translator Agent:")
+ response = await translator_agent.process(AgentMessage(
+ content="Hello, how are you?",
+ sender="User",
+ timestamp=time.time(),
+ metadata={"target_language": "french"}
+ ))
+ print(f"Input: Hello, how are you? (to French)")
+ print(f"Output: {response.content}\n")
+
+ # Test chat agent
+ print("5. Testing Chat Agent:")
+ response = await chat_agent.process(AgentMessage(
+ content="Hello! What can you do?",
+ sender="User",
+ timestamp=time.time()
+ ))
+ print(f"Input: Hello! What can you do?")
+ print(f"Output: {response.content}\n")
+
+async def demo_agent_system():
+ """Demo the complete agent system"""
+ print("\n=== Agent System Demo ===\n")
+
+ # Create agent manager and register agents
+ manager = AgentManager()
+
+ manager.register_agent(EchoAgent())
+ manager.register_agent(CalculatorAgent())
+ manager.register_agent(WeatherAgent())
+ manager.register_agent(TranslatorAgent())
+ manager.register_agent(ChatAgent())
+
+ print(f"Registered {len(manager.agents)} agents:")
+ for agent_name in manager.agents.keys():
+ print(f" - {agent_name}")
+
+ # Test individual agent communication
+ print("\n1. Testing Calculator Agent through Manager:")
+ response = await manager.send_message("CalculatorAgent", "45 + 17 * 3")
+ print(f"Response: {response.content}")
+
+ print("\n2. Testing Weather Agent through Manager:")
+ response = await manager.send_message("WeatherAgent", "London")
+ print(f"Response: {response.content}")
+
+ print("\n3. Testing Broadcast Message:")
+ responses = await manager.broadcast_message("Hello everyone!")
+ print("Broadcast responses:")
+ for agent_name, response in responses.items():
+ print(f" {agent_name}: {response.content[:50]}...")
+
+ # Demonstrate agent discovery
+ print("\n4. Agent Discovery:")
+ math_agents = manager.find_agent_by_capability("calculation")
+ print(f"Agents for calculations: {math_agents}")
+
+ weather_agents = manager.find_agent_by_capability("weather")
+ print(f"Agents for weather: {weather_agents}")
+
+ # Show system statistics
+ print("\n5. System Statistics:")
+ stats = manager.get_system_stats()
+ for key, value in stats.items():
+ print(f" {key}: {value}")
+
+async def interactive_demo():
+ """Interactive demo where users can talk to agents"""
+ print("\n=== Interactive Demo ===\n")
+
+ manager = AgentManager()
+ manager.register_agent(ChatAgent())
+ manager.register_agent(CalculatorAgent())
+ manager.register_agent(WeatherAgent())
+ manager.register_agent(TranslatorAgent())
+
+ print("Available agents: ChatAgent, CalculatorAgent, WeatherAgent,
TranslatorAgent")
+ print("Type 'quit' to exit, 'help' for commands, 'agents' to list
agents\n")
+
+ while True:
+ try:
+ user_input = input("You: ").strip()
+
+ if user_input.lower() in ['quit', 'exit']:
+ break
+ elif user_input.lower() == 'help':
+ print("\nAvailable commands:")
+ print(" 'calc [expression]' - Use calculator")
+ print(" 'weather [city]' - Get weather")
+ print(" 'translate [text] to [language]' - Translate text")
+ print(" 'agents' - List all agents")
+ print(" 'stats' - Show system statistics")
+ print(" 'quit' - Exit\n")
+ continue
+ elif user_input.lower() == 'agents':
+ print("\nRegistered agents:")
+ for name, agent in manager.agents.items():
+ print(f" {name}: {agent.description}")
+ print()
+ continue
+ elif user_input.lower() == 'stats':
+ stats = manager.get_system_stats()
+ print("\nSystem Statistics:")
+ for key, value in stats.items():
+ print(f" {key}: {value}")
+ print()
+ continue
+
+ # Route to appropriate agent based on input
+ if user_input.startswith('calc '):
+ expression = user_input[5:]
+ response = await manager.send_message("CalculatorAgent",
expression)
+ print(f"Calculator: {response.content}\n")
+
+ elif user_input.startswith('weather '):
+ city = user_input[8:]
+ response = await manager.send_message("WeatherAgent", city)
+ print(f"Weather: {response.content}\n")
+
+ elif user_input.startswith('translate '):
+ # Simple parsing for translation commands
+ parts = user_input[10:].split(' to ')
+ if len(parts) == 2:
+ text, language = parts
+ response = await manager.send_message(
+ "TranslatorAgent",
+ text,
+ {"target_language": language.strip()}
+ )
+ print(f"Translator: {response.content}\n")
+ else:
+ print("Usage: translate [text] to [language]\n")
+
+ else:
+ # Default to chat agent
+ response = await manager.send_message("ChatAgent", user_input)
+ print(f"ChatAgent: {response.content}\n")
+
+ except KeyboardInterrupt:
+ break
+ except Exception as e:
+ print(f"Error: {e}\n")
+
+if __name__ == "__main__":
+ # Run demos
+ asyncio.run(demo_single_agents())
+ asyncio.run(demo_agent_system())
\ No newline at end of file
diff --git a/Agent_module/Chat_Agent.python b/Agent_module/Chat_Agent.python
new file mode 100644
index 0000000000..2a4a45ccdf
--- /dev/null
+++ b/Agent_module/Chat_Agent.python
@@ -0,0 +1,97 @@
+class ChatAgent(BaseAgent):
+ """Intelligent chat agent with context awareness"""
+
+ def __init__(self, name: str = "ChatAgent"):
+ super().__init__(name, "Intelligent conversational agent")
+ self.context = {}
+ self.personality_traits = [
+ "friendly", "helpful", "knowledgeable", "enthusiastic"
+ ]
+
+ async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+ user_input = str(message.content).lower()
+ user_id = message.metadata.get("user_id", "unknown")
+
+ # Update context
+ if user_id not in self.context:
+ self.context[user_id] = {
+ "conversation_count": 0,
+ "last_interaction": time.time(),
+ "topics": set()
+ }
+
+ self.context[user_id]["conversation_count"] += 1
+ self.context[user_id]["last_interaction"] = time.time()
+
+ # Simple intent recognition
+ response = await self._generate_response(user_input, user_id)
+
+ return AgentMessage(
+ content=response,
+ sender=self.name,
+ timestamp=time.time(),
+ message_type="chat_response",
+ metadata={"user_context": self.context[user_id]}
+ )
+
+ async def _generate_response(self, user_input: str, user_id: str) -> str:
+ """Generate context-aware responses"""
+
+ # Greeting detection
+ if any(word in user_input for word in ["hello", "hi", "hey",
"greetings"]):
+ return random.choice([
+ "Hello! How can I assist you today?",
+ "Hi there! What can I help you with?",
+ "Greetings! I'm here to help.",
+ "Hello! Nice to meet you."
+ ])
+
+ # Question detection
+ elif "?" in user_input or any(word in user_input for word in ["what",
"how", "when", "where", "why"]):
+ return await self._answer_question(user_input)
+
+ # Weather inquiry
+ elif "weather" in user_input:
+ return "I can help with weather information! Please use the
WeatherAgent for accurate weather data."
+
+ # Calculation request
+ elif any(word in user_input for word in ["calculate", "math",
"equation"]):
+ return "I can help with calculations! Try using the
CalculatorAgent for mathematical operations."
+
+ # Translation request
+ elif any(word in user_input for word in ["translate", "translation"]):
+ return "I can assist with translations! The TranslatorAgent
specializes in language translation."
+
+ # Default response
+ else:
+ return random.choice([
+ "That's interesting! Can you tell me more?",
+ "I understand. How can I help you with that?",
+ "Thanks for sharing! Is there anything specific you'd like to
know?",
+ "I see. What would you like to do next?"
+ ])
+
+ async def _answer_question(self, question: str) -> str:
+ """Answer general knowledge questions"""
+ question_lower = question.lower()
+
+ if "time" in question_lower:
+ return f"The current time is {time.strftime('%H:%M:%S')}"
+
+ elif "date" in question_lower:
+ return f"Today's date is {time.strftime('%Y-%m-%d')}"
+
+ elif "name" in question_lower:
+ return "I'm ChatAgent, your friendly AI assistant!"
+
+ elif "purpose" in question_lower or "what can you do" in
question_lower:
+ return "I can chat with you, answer questions, and coordinate with
other specialized agents for weather, calculations, translations, and more!"
+
+ else:
+ responses = [
+ "That's a good question! Let me think about it...",
+ "I'm not entirely sure about that, but I'd be happy to help
you find out!",
+ "Interesting question! Here's what I know about that topic...",
+ "I understand your curiosity about that subject."
+ ]
+ return random.choice(responses)
\ No newline at end of file
diff --git a/Agent_module/Framework.python b/Agent_module/Framework.python
new file mode 100644
index 0000000000..d7c5e6b902
--- /dev/null
+++ b/Agent_module/Framework.python
@@ -0,0 +1,78 @@
+import asyncio
+import time
+from abc import ABC, abstractmethod
+from typing import Any, Dict, List, Optional, Callable
+from dataclasses import dataclass
+from enum import Enum
+import random
+import json
+
+class AgentStatus(Enum):
+ IDLE = "idle"
+ PROCESSING = "processing"
+ ERROR = "error"
+ STOPPED = "stopped"
+
+@dataclass
+class AgentMessage:
+ content: Any
+ sender: str
+ timestamp: float
+ message_type: str = "text"
+ metadata: Dict[str, Any] = None
+
+ def __post_init__(self):
+ if self.metadata is None:
+ self.metadata = {}
+ self.timestamp = self.timestamp or time.time()
+
+class BaseAgent(ABC):
+ """Base class for all agents"""
+
+ def __init__(self, name: str, description: str = ""):
+ self.name = name
+ self.description = description
+ self.status = AgentStatus.IDLE
+ self.message_history: List[AgentMessage] = []
+ self._callbacks: List[Callable] = []
+
+ async def process(self, message: AgentMessage) -> AgentMessage:
+ """Process a message and return response"""
+ self.status = AgentStatus.PROCESSING
+ self.message_history.append(message)
+
+ try:
+ result = await self._process_impl(message)
+ self.status = AgentStatus.IDLE
+ self._notify_callbacks(message, result)
+ return result
+ except Exception as e:
+ self.status = AgentStatus.ERROR
+ error_msg = AgentMessage(
+ content=f"Error: {str(e)}",
+ sender=self.name,
+ timestamp=time.time(),
+ message_type="error"
+ )
+ return error_msg
+
+ @abstractmethod
+ async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+ """Agent-specific processing logic"""
+ pass
+
+ def add_callback(self, callback: Callable):
+ """Add callback for message processing events"""
+ self._callbacks.append(callback)
+
+ def _notify_callbacks(self, input_msg: AgentMessage, output_msg:
AgentMessage):
+ """Notify all callbacks"""
+ for callback in self._callbacks:
+ try:
+ callback(input_msg, output_msg)
+ except Exception:
+ pass # Don't break agent if callback fails
+
+ def get_recent_messages(self, count: int = 10) -> List[AgentMessage]:
+ """Get recent message history"""
+ return self.message_history[-count:]
\ No newline at end of file
diff --git a/Agent_module/Implementation.python
b/Agent_module/Implementation.python
new file mode 100644
index 0000000000..59a806ce50
--- /dev/null
+++ b/Agent_module/Implementation.python
@@ -0,0 +1,119 @@
+class EchoAgent(BaseAgent):
+ """Simple echo agent that repeats messages"""
+
+ def __init__(self, name: str = "EchoAgent"):
+ super().__init__(name, "Echoes back received messages")
+
+ async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+ await asyncio.sleep(0.1) # Simulate processing time
+
+ response_content = f"Echo: {message.content}"
+
+ return AgentMessage(
+ content=response_content,
+ sender=self.name,
+ timestamp=time.time(),
+ message_type="echo_response"
+ )
+
+class CalculatorAgent(BaseAgent):
+ """Agent that performs mathematical calculations"""
+
+ def __init__(self, name: str = "CalculatorAgent"):
+ super().__init__(name, "Performs mathematical calculations")
+
+ async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+ try:
+ expression = str(message.content).strip()
+ # Security: Only allow basic math operations
+ allowed_chars = set('0123456789+-*/.() ')
+ if not all(c in allowed_chars for c in expression):
+ raise ValueError("Invalid characters in expression")
+
+ result = eval(expression) # In real code, use a safe eval or
parser
+
+ return AgentMessage(
+ content=f"{expression} = {result}",
+ sender=self.name,
+ timestamp=time.time(),
+ message_type="calculation_result"
+ )
+ except Exception as e:
+ return AgentMessage(
+ content=f"Calculation error: {str(e)}",
+ sender=self.name,
+ timestamp=time.time(),
+ message_type="error"
+ )
+
+class WeatherAgent(BaseAgent):
+ """Simulated weather information agent"""
+
+ def __init__(self, name: str = "WeatherAgent"):
+ super().__init__(name, "Provides weather information")
+ self.weather_data = {
+ "New York": {"temp": 22, "condition": "Sunny", "humidity": 65},
+ "London": {"temp": 15, "condition": "Cloudy", "humidity": 80},
+ "Tokyo": {"temp": 18, "condition": "Rainy", "humidity": 75},
+ "Sydney": {"temp": 25, "condition": "Clear", "humidity": 60}
+ }
+
+ async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+ location = str(message.content).strip().title()
+
+ await asyncio.sleep(0.2) # Simulate API call
+
+ if location in self.weather_data:
+ weather = self.weather_data[location]
+ response = (
+ f"Weather in {location}:\n"
+ f"Temperature: {weather['temp']}°C\n"
+ f"Condition: {weather['condition']}\n"
+ f"Humidity: {weather['humidity']}%"
+ )
+ else:
+ # Generate random weather for unknown locations
+ temp = random.randint(-10, 35)
+ conditions = ["Sunny", "Cloudy", "Rainy", "Snowy"]
+ condition = random.choice(conditions)
+ humidity = random.randint(30, 95)
+
+ response = (
+ f"Weather in {location} (simulated):\n"
+ f"Temperature: {temp}°C\n"
+ f"Condition: {condition}\n"
+ f"Humidity: {humidity}%"
+ )
+
+ return AgentMessage(
+ content=response,
+ sender=self.name,
+ timestamp=time.time(),
+ message_type="weather_report"
+ )
+
+class TranslatorAgent(BaseAgent):
+ """Simple translation agent (simulated)"""
+
+ def __init__(self, name: str = "TranslatorAgent"):
+ super().__init__(name, "Translates text between languages")
+ self.supported_languages = ["english", "spanish", "french", "german",
"italian"]
+
+ async def _process_impl(self, message: AgentMessage) -> AgentMessage:
+ text = str(message.content)
+ target_language = message.metadata.get("target_language",
"spanish").lower()
+
+ await asyncio.sleep(0.3) # Simulate translation processing
+
+ if target_language not in self.supported_languages:
+ response = f"Sorry, I don't support {target_language}. Supported
languages: {', '.join(self.supported_languages)}"
+ else:
+ # Simulated translation - in real world, this would call a
translation API
+ response = f"[{target_language.upper()} TRANSLATION] {text} ->
Translated text in {target_language}"
+
+ return AgentMessage(
+ content=response,
+ sender=self.name,
+ timestamp=time.time(),
+ message_type="translation"
+ )
\ No newline at end of file
diff --git a/Agent_module/Manager.python b/Agent_module/Manager.python
new file mode 100644
index 0000000000..ff51910795
--- /dev/null
+++ b/Agent_module/Manager.python
@@ -0,0 +1,103 @@
+class AgentManager:
+ """Manages multiple agents and coordinates between them"""
+
+ def __init__(self):
+ self.agents: Dict[str, BaseAgent] = {}
+ self.conversation_log: List[Dict] = []
+
+ def register_agent(self, agent: BaseAgent):
+ """Register a new agent"""
+ self.agents[agent.name] = agent
+
+ # Add logging callback to all agents
+ agent.add_callback(self._log_interaction)
+
+ async def send_message(self, agent_name: str, content: Any,
+ metadata: Dict[str, Any] = None) -> AgentMessage:
+ """Send message to specific agent"""
+ if agent_name not in self.agents:
+ raise ValueError(f"Agent '{agent_name}' not found")
+
+ message = AgentMessage(
+ content=content,
+ sender="User",
+ timestamp=time.time(),
+ metadata=metadata or {}
+ )
+
+ agent = self.agents[agent_name]
+ response = await agent.process(message)
+
+ return response
+
+ async def broadcast_message(self, content: Any,
+ metadata: Dict[str, Any] = None) -> Dict[str,
AgentMessage]:
+ """Send message to all agents and collect responses"""
+ message = AgentMessage(
+ content=content,
+ sender="Broadcast",
+ timestamp=time.time(),
+ metadata=metadata or {}
+ )
+
+ tasks = []
+ for agent_name, agent in self.agents.items():
+ tasks.append(agent.process(message))
+
+ responses = await asyncio.gather(*tasks, return_exceptions=True)
+
+ results = {}
+ for agent_name, response in zip(self.agents.keys(), responses):
+ if isinstance(response, Exception):
+ results[agent_name] = AgentMessage(
+ content=f"Error: {str(response)}",
+ sender=agent_name,
+ timestamp=time.time(),
+ message_type="error"
+ )
+ else:
+ results[agent_name] = response
+
+ return results
+
+ def find_agent_by_capability(self, keyword: str) -> List[str]:
+ """Find agents that might handle a specific capability"""
+ matching_agents = []
+ for agent_name, agent in self.agents.items():
+ if (keyword.lower() in agent.name.lower() or
+ keyword.lower() in agent.description.lower()):
+ matching_agents.append(agent_name)
+ return matching_agents
+
+ def _log_interaction(self, input_msg: AgentMessage, output_msg:
AgentMessage):
+ """Log agent interactions"""
+ log_entry = {
+ "timestamp": time.time(),
+ "input": {
+ "content": input_msg.content,
+ "sender": input_msg.sender,
+ "type": input_msg.message_type
+ },
+ "output": {
+ "content": output_msg.content,
+ "sender": output_msg.sender,
+ "type": output_msg.message_type
+ },
+ "processing_time": output_msg.timestamp - input_msg.timestamp
+ }
+ self.conversation_log.append(log_entry)
+
+ def get_agent_statuses(self) -> Dict[str, str]:
+ """Get status of all agents"""
+ return {name: agent.status.value for name, agent in
self.agents.items()}
+
+ def get_system_stats(self) -> Dict[str, Any]:
+ """Get system statistics"""
+ total_messages = sum(len(agent.message_history) for agent in
self.agents.values())
+
+ return {
+ "total_agents": len(self.agents),
+ "total_messages_processed": total_messages,
+ "conversation_log_entries": len(self.conversation_log),
+ "agent_statuses": self.get_agent_statuses()
+ }
\ No newline at end of file