This is an automated email from the ASF dual-hosted git repository.

wuxinfan pushed a commit to branch a2a-2025
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/a2a-2025 by this push:
     new 78989b19 feat(a2a): implement A2A Server Filter (#771)
78989b19 is described below

commit 78989b1914fb3a3d96f847c653368590b2a39534
Author: Zerui Yang <[email protected]>
AuthorDate: Thu Oct 30 15:02:52 2025 +0800

    feat(a2a): implement A2A Server Filter (#771)
    
    * feat(a2a): implement A2A Server Filter with configuration and task 
management
    
    * refactor(filter): reorganize import statements for clarity
    
    * fix: fix the err
    
    * feat(filter): add A2A task and message management with tests
    
    * fix: standardize cancellation terminology from "cancelled" to "canceled"
    
    * fix: standardize cancellation terminology from "cancelled" to "canceled"
---
 pkg/common/constant/key.go          |   3 +
 pkg/filter/a2a/config.go            | 195 ++++++++
 pkg/filter/a2a/constants.go         |  82 ++++
 pkg/filter/a2a/filter.go            | 856 ++++++++++++++++++++++++++++++++++++
 pkg/filter/a2a/filter_test.go       | 360 +++++++++++++++
 pkg/filter/a2a/plugin.go            |  39 ++
 pkg/filter/a2a/task_manager.go      | 228 ++++++++++
 pkg/filter/a2a/task_manager_test.go | 226 ++++++++++
 pkg/filter/a2a/types.go             | 202 +++++++++
 pkg/pluginregistry/registry.go      |   1 +
 10 files changed, 2192 insertions(+)

diff --git a/pkg/common/constant/key.go b/pkg/common/constant/key.go
index 0400250c..09fa037e 100644
--- a/pkg/common/constant/key.go
+++ b/pkg/common/constant/key.go
@@ -61,6 +61,9 @@ const (
        LLMTokenizerFilter = "dgp.filter.llm.tokenizer"
 
        MCPServerFilter = "dgp.filter.mcp.mcpserver"
+
+       // A2A Filters
+       A2AServerFilter = "dgp.filter.a2a.server"
 )
 
 const (
diff --git a/pkg/filter/a2a/config.go b/pkg/filter/a2a/config.go
new file mode 100644
index 00000000..d01f3390
--- /dev/null
+++ b/pkg/filter/a2a/config.go
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+// Config represents the configuration for the A2A Server Filter
+type Config struct {
+       // Endpoint specifies the A2A service endpoint path
+       Endpoint string `yaml:"endpoint" json:"endpoint" default:"/a2a"`
+
+       // AgentInfo contains information about the current agent
+       AgentInfo *AgentConfig `yaml:"agent_info" json:"agent_info"`
+
+       // KnownAgents contains a list of statically configured known agents
+       KnownAgents []AgentConfig `yaml:"known_agents" 
json:"known_agents,omitempty"`
+
+       // TaskConfig contains task management related configuration
+       TaskConfig *TaskConfig `yaml:"task_config" json:"task_config,omitempty"`
+}
+
+// AgentConfig represents configuration for an agent
+type AgentConfig struct {
+       // AgentID is the unique identifier for the agent
+       AgentID string `yaml:"agent_id" json:"agent_id"`
+
+       // Name is the human-readable name of the agent
+       Name string `yaml:"name" json:"name"`
+
+       // Version is the version of the agent
+       Version string `yaml:"version" json:"version" default:"1.0.0"`
+
+       // Description is an optional description of the agent
+       Description string `yaml:"description" json:"description,omitempty"`
+
+       // Endpoint is the HTTP endpoint where the agent can be reached
+       Endpoint string `yaml:"endpoint" json:"endpoint"`
+
+       // Status is the current status of the agent
+       Status AgentStatus `yaml:"status" json:"status" default:"online"`
+
+       // Capabilities lists the capabilities that the agent provides
+       Capabilities []CapabilityConfig `yaml:"capabilities" 
json:"capabilities,omitempty"`
+
+       // Metadata contains additional metadata about the agent
+       Metadata map[string]any `yaml:"metadata" json:"metadata,omitempty"`
+}
+
+// CapabilityConfig represents configuration for an agent capability
+type CapabilityConfig struct {
+       // Name is the unique name of the capability
+       Name string `yaml:"name" json:"name"`
+
+       // Description is an optional description of the capability
+       Description string `yaml:"description" json:"description,omitempty"`
+
+       // InputTypes specifies the types of input this capability accepts
+       InputTypes []string `yaml:"input_types" json:"input_types,omitempty"`
+
+       // OutputTypes specifies the types of output this capability produces
+       OutputTypes []string `yaml:"output_types" json:"output_types,omitempty"`
+
+       // Tags are labels that can be used to categorize and discover 
capabilities
+       Tags []string `yaml:"tags" json:"tags,omitempty"`
+
+       // Parameters defines the parameters that this capability accepts
+       Parameters []ParameterConfig `yaml:"parameters" 
json:"parameters,omitempty"`
+}
+
+// ParameterConfig represents configuration for a capability parameter
+type ParameterConfig struct {
+       // Name is the parameter name
+       Name string `yaml:"name" json:"name"`
+
+       // Type is the parameter type (string, number, boolean, object, array)
+       Type string `yaml:"type" json:"type" default:"string"`
+
+       // Description is an optional description of the parameter
+       Description string `yaml:"description" json:"description,omitempty"`
+
+       // Required indicates whether this parameter is required
+       Required bool `yaml:"required" json:"required" default:"false"`
+
+       // Default is the default value for the parameter
+       Default any `yaml:"default" json:"default,omitempty"`
+
+       // Enum lists the allowed values for the parameter (if applicable)
+       Enum []any `yaml:"enum" json:"enum,omitempty"`
+}
+
+// TaskConfig represents configuration for task management
+type TaskConfig struct {
+       // DefaultTimeout is the default timeout for tasks in milliseconds
+       DefaultTimeout int64 `yaml:"default_timeout" json:"default_timeout" 
default:"30000"`
+
+       // MaxConcurrentTasks is the maximum number of concurrent tasks
+       MaxConcurrentTasks int `yaml:"max_concurrent_tasks" 
json:"max_concurrent_tasks" default:"100"`
+
+       // CleanupInterval is the interval for cleaning up completed/expired 
tasks in milliseconds
+       CleanupInterval int64 `yaml:"cleanup_interval" json:"cleanup_interval" 
default:"300000"`
+
+       // TaskRetention is how long to keep completed tasks in milliseconds
+       TaskRetention int64 `yaml:"task_retention" json:"task_retention" 
default:"3600000"`
+}
+
+// GetDefaultAgentInfo returns a default agent configuration
+func (c *Config) GetDefaultAgentInfo() *AgentInfo {
+       if c.AgentInfo == nil {
+               return &AgentInfo{
+                       AgentID:      "default-a2a-agent",
+                       Name:         "Default A2A Agent",
+                       Version:      DefaultAgentVersion,
+                       Description:  "A default A2A agent",
+                       Endpoint:     c.Endpoint,
+                       Status:       StatusOnline,
+                       Capabilities: []Capability{},
+                       Metadata:     make(map[string]any),
+               }
+       }
+
+       return c.AgentInfo.ToAgentInfo(c.Endpoint)
+}
+
+// ToAgentInfo converts AgentConfig to AgentInfo
+func (ac *AgentConfig) ToAgentInfo(defaultEndpoint string) *AgentInfo {
+       endpoint := ac.Endpoint
+       if endpoint == "" {
+               endpoint = defaultEndpoint
+       }
+
+       capabilities := make([]Capability, len(ac.Capabilities))
+       for i, cap := range ac.Capabilities {
+               capabilities[i] = cap.ToCapability()
+       }
+
+       return &AgentInfo{
+               AgentID:      ac.AgentID,
+               Name:         ac.Name,
+               Version:      ac.Version,
+               Description:  ac.Description,
+               Endpoint:     endpoint,
+               Status:       ac.Status,
+               Capabilities: capabilities,
+               Metadata:     ac.Metadata,
+       }
+}
+
+// ToCapability converts CapabilityConfig to Capability
+func (cc *CapabilityConfig) ToCapability() Capability {
+       parameters := make([]Parameter, len(cc.Parameters))
+       for i, param := range cc.Parameters {
+               parameters[i] = Parameter{
+                       Name:        param.Name,
+                       Type:        param.Type,
+                       Description: param.Description,
+                       Required:    param.Required,
+                       Default:     param.Default,
+               }
+       }
+
+       return Capability{
+               Name:        cc.Name,
+               Description: cc.Description,
+               InputTypes:  cc.InputTypes,
+               OutputTypes: cc.OutputTypes,
+               Tags:        cc.Tags,
+               Parameters:  parameters,
+       }
+}
+
+// GetTaskConfig returns the task configuration or default values
+func (c *Config) GetTaskConfig() *TaskConfig {
+       if c.TaskConfig == nil {
+               return &TaskConfig{
+                       DefaultTimeout:     DefaultTaskTimeout,
+                       MaxConcurrentTasks: DefaultMaxConcurrentTasks,
+                       CleanupInterval:    DefaultTaskCleanupInterval,
+                       TaskRetention:      DefaultTaskCleanupInterval * 12, // 
1 hour default
+               }
+       }
+       return c.TaskConfig
+}
diff --git a/pkg/filter/a2a/constants.go b/pkg/filter/a2a/constants.go
new file mode 100644
index 00000000..8a73b83b
--- /dev/null
+++ b/pkg/filter/a2a/constants.go
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+)
+
+const (
+       // Kind is the type identifier for A2A Server Filter
+       Kind = constant.A2AServerFilter
+
+       // DefaultEndpoint is the default A2A service endpoint
+       DefaultEndpoint = "/a2a"
+
+       // DefaultAgentVersion is the default agent version
+       DefaultAgentVersion = "1.0.0"
+
+       // JSON-RPC version constant
+       JSONRPCVersion = "2.0"
+
+       // A2A Protocol Methods
+
+       // Core methods
+       MethodPing           = "a2a.ping"
+       MethodGetAgentCard   = "a2a.get_agent_card"
+       MethodDiscoverAgents = "a2a.discover_agents"
+
+       // Task management methods
+       MethodCreateTask    = "a2a.create_task"
+       MethodGetTaskStatus = "a2a.get_task_status"
+       MethodUpdateTask    = "a2a.update_task"
+       MethodCancelTask    = "a2a.cancel_task"
+
+       // Communication methods
+       MethodSendMessage      = "a2a.send_message"
+       MethodBroadcastMessage = "a2a.broadcast_message"
+
+       // Agent status constants
+       AgentStatusOnline  = "online"
+       AgentStatusOffline = "offline"
+       AgentStatusBusy    = "busy"
+
+       // Task status constants
+       TaskStatusPending   = "pending"
+       TaskStatusRunning   = "running"
+       TaskStatusCompleted = "completed"
+       TaskStatusFailed    = "failed"
+       TaskStatusCancelled = "canceled"
+
+       // Error codes for JSON-RPC errors
+       ErrorCodeParseError     = -32700
+       ErrorCodeInvalidRequest = -32600
+       ErrorCodeMethodNotFound = -32601
+       ErrorCodeInvalidParams  = -32602
+       ErrorCodeInternalError  = -32603
+
+       // A2A specific error codes
+       ErrorCodeAgentNotFound = -32001
+       ErrorCodeTaskNotFound  = -32002
+       ErrorCodeTaskTimeout   = -32003
+
+       // Default configuration values
+       DefaultTaskTimeout         = 30000 // 30 seconds in milliseconds
+       DefaultMaxConcurrentTasks  = 100
+       DefaultTaskCleanupInterval = 300000 // 5 minutes in milliseconds
+)
diff --git a/pkg/filter/a2a/filter.go b/pkg/filter/a2a/filter.go
new file mode 100644
index 00000000..e737de44
--- /dev/null
+++ b/pkg/filter/a2a/filter.go
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "io"
+       "net/http"
+       "strings"
+       "sync"
+       "time"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+       contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// FilterFactory is the factory for creating A2A filter instances
+type FilterFactory struct {
+       cfg         *Config
+       agentMap    map[string]*AgentInfo
+       taskManager *TaskManager
+       mutex       sync.RWMutex
+}
+
+// Config returns the configuration struct for the factory
+func (f *FilterFactory) Config() any {
+       return f.cfg
+}
+
+// Apply initializes the filter factory from its configuration
+func (f *FilterFactory) Apply() error {
+       logger.Infof("[dubbo-go-pixiu] A2A Server Filter factory applying 
configuration")
+
+       // Initialize agent map
+       f.agentMap = make(map[string]*AgentInfo)
+
+       // Load current agent info
+       agentInfo := f.cfg.GetDefaultAgentInfo()
+       f.agentMap[agentInfo.AgentID] = agentInfo
+
+       // Load known agents from configuration
+       for _, agentConfig := range f.cfg.KnownAgents {
+               agent := agentConfig.ToAgentInfo(f.cfg.Endpoint)
+               f.agentMap[agent.AgentID] = agent
+               logger.Infof("[dubbo-go-pixiu] A2A Server loaded known agent: 
%s (%s)", agent.Name, agent.AgentID)
+       }
+
+       // Initialize task manager
+       taskConfig := f.cfg.GetTaskConfig()
+       f.taskManager = NewTaskManager(taskConfig)
+
+       logger.Infof("[dubbo-go-pixiu] A2A Server Filter factory initialized 
with %d agents", len(f.agentMap))
+       return nil
+}
+
+// PrepareFilterChain prepares the filter chain for a new request
+func (f *FilterFactory) PrepareFilterChain(_ *contexthttp.HttpContext, chain 
filter.FilterChain) error {
+       a2aFilter := &A2AFilter{
+               cfg:         f.cfg,
+               agentMap:    f.agentMap,
+               taskManager: f.taskManager,
+               mutex:       &f.mutex,
+       }
+
+       chain.AppendDecodeFilters(a2aFilter)
+       return nil
+}
+
+// A2AFilter is the actual filter that processes A2A requests
+type A2AFilter struct {
+       cfg         *Config
+       agentMap    map[string]*AgentInfo
+       taskManager *TaskManager
+       mutex       *sync.RWMutex
+}
+
+// Decode processes incoming HTTP requests for A2A protocol
+func (f *A2AFilter) Decode(ctx *contexthttp.HttpContext) filter.FilterStatus {
+       // Check if this is an A2A request
+       if !f.isA2ARequest(ctx) {
+               return filter.Continue
+       }
+
+       logger.Infof("[dubbo-go-pixiu] A2A Server processing request: %s %s", 
ctx.Request.Method, ctx.Request.URL.Path)
+
+       // Read request body
+       body, err := io.ReadAll(ctx.Request.Body)
+       if err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to read 
request body: %v", err)
+               f.sendErrorResponse(ctx, nil, ErrorCodeInternalError, "Failed 
to read request body")
+               return filter.Stop
+       }
+
+       // Parse JSON-RPC request
+       var jsonrpcReq JSONRPCRequest
+       if err := json.Unmarshal(body, &jsonrpcReq); err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse 
JSON-RPC request: %v", err)
+               f.sendErrorResponse(ctx, nil, ErrorCodeParseError, "Invalid 
JSON-RPC request")
+               return filter.Stop
+       }
+
+       // Validate JSON-RPC version
+       if jsonrpcReq.JSONRPC != JSONRPCVersion {
+               logger.Warnf("[dubbo-go-pixiu] A2A Server invalid JSON-RPC 
version: %s", jsonrpcReq.JSONRPC)
+               f.sendErrorResponse(ctx, jsonrpcReq.ID, 
ErrorCodeInvalidRequest, "Invalid JSON-RPC version")
+               return filter.Stop
+       }
+
+       logger.Infof("[dubbo-go-pixiu] A2A Server received method: %s (id: 
%v)", jsonrpcReq.Method, jsonrpcReq.ID)
+
+       // Process the A2A method
+       response := f.processA2AMethod(&jsonrpcReq)
+
+       // Send response
+       f.sendJSONResponse(ctx, response)
+       return filter.Stop
+}
+
+// isA2ARequest checks if the incoming request is an A2A request
+func (f *A2AFilter) isA2ARequest(ctx *contexthttp.HttpContext) bool {
+       // Check if the request path matches the A2A endpoint
+       if !strings.HasPrefix(ctx.Request.URL.Path, f.cfg.Endpoint) {
+               return false
+       }
+
+       // Check if it's a POST request with JSON content
+       if ctx.Request.Method != "POST" {
+               return false
+       }
+
+       contentType := ctx.Request.Header.Get("Content-Type")
+       return strings.Contains(contentType, "application/json")
+}
+
+// processA2AMethod processes different A2A protocol methods
+func (f *A2AFilter) processA2AMethod(req *JSONRPCRequest) *JSONRPCResponse {
+       switch req.Method {
+       case MethodPing:
+               return f.handlePing(req)
+       case MethodGetAgentCard:
+               return f.handleGetAgentCard(req)
+       case MethodDiscoverAgents:
+               return f.handleDiscoverAgents(req)
+       case MethodCreateTask:
+               return f.handleCreateTask(req)
+       case MethodGetTaskStatus:
+               return f.handleGetTaskStatus(req)
+       case MethodUpdateTask:
+               return f.handleUpdateTask(req)
+       case MethodCancelTask:
+               return f.handleCancelTask(req)
+       case MethodSendMessage:
+               return f.handleSendMessage(req)
+       case MethodBroadcastMessage:
+               return f.handleBroadcastMessage(req)
+       default:
+               logger.Warnf("[dubbo-go-pixiu] A2A Server unsupported method: 
%s", req.Method)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeMethodNotFound,
+                               Message: "Method not found",
+                       },
+               }
+       }
+}
+
+// Basic method handlers (placeholder implementations for phase 1)
+
+// handlePing handles ping requests
+func (f *A2AFilter) handlePing(req *JSONRPCRequest) *JSONRPCResponse {
+       f.mutex.RLock()
+       agentInfo := f.cfg.GetDefaultAgentInfo()
+       f.mutex.RUnlock()
+
+       response := PingResponse{
+               Status:    "pong",
+               AgentID:   agentInfo.AgentID,
+               Timestamp: time.Now().UnixMilli(),
+       }
+
+       return &JSONRPCResponse{
+               JSONRPC: JSONRPCVersion,
+               ID:      req.ID,
+               Result:  response,
+       }
+}
+
+// handleGetAgentCard handles get agent card requests
+func (f *A2AFilter) handleGetAgentCard(req *JSONRPCRequest) *JSONRPCResponse {
+       f.mutex.RLock()
+       agentInfo := f.cfg.GetDefaultAgentInfo()
+       f.mutex.RUnlock()
+
+       return &JSONRPCResponse{
+               JSONRPC: JSONRPCVersion,
+               ID:      req.ID,
+               Result:  agentInfo,
+       }
+}
+
+// handleDiscoverAgents handles agent discovery requests
+func (f *A2AFilter) handleDiscoverAgents(req *JSONRPCRequest) *JSONRPCResponse 
{
+       f.mutex.RLock()
+       defer f.mutex.RUnlock()
+
+       // For now, return all known agents
+       var agents []AgentInfo
+       for _, agent := range f.agentMap {
+               agents = append(agents, *agent)
+       }
+
+       response := DiscoverAgentsResponse{
+               Agents: agents,
+       }
+
+       return &JSONRPCResponse{
+               JSONRPC: JSONRPCVersion,
+               ID:      req.ID,
+               Result:  response,
+       }
+}
+
+// Placeholder implementations for other methods (to be implemented in later 
phases)
+
+func (f *A2AFilter) handleCreateTask(req *JSONRPCRequest) *JSONRPCResponse {
+       // Parse request parameters
+       paramsBytes, err := json.Marshal(req.Params)
+       if err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal 
create task params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid parameters",
+                       },
+               }
+       }
+
+       var params CreateTaskRequest
+       if err := json.Unmarshal(paramsBytes, &params); err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse 
create task params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid task parameters",
+                       },
+               }
+       }
+
+       // Validate required fields
+       if params.To == "" || params.Type == "" || params.Content == nil {
+               logger.Warnf("[dubbo-go-pixiu] A2A Server create task missing 
required fields")
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Missing required fields: to, type, 
content",
+                       },
+               }
+       }
+
+       // Get current agent info as the sender
+       f.mutex.RLock()
+       agentInfo := f.cfg.GetDefaultAgentInfo()
+       f.mutex.RUnlock()
+
+       // Create task
+       task, err := f.taskManager.CreateTask(agentInfo.AgentID, params.To, 
params.Type, params.Content, params.Timeout)
+       if err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to create 
task: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInternalError,
+                               Message: fmt.Sprintf("Failed to create task: 
%v", err),
+                       },
+               }
+       }
+
+       logger.Infof("[dubbo-go-pixiu] A2A Server created task %s: %s -> %s", 
task.TaskID, task.From, task.To)
+
+       // Build response
+       response := CreateTaskResponse{
+               TaskID: task.TaskID,
+               Status: task.Status,
+       }
+
+       return &JSONRPCResponse{
+               JSONRPC: JSONRPCVersion,
+               ID:      req.ID,
+               Result:  response,
+       }
+}
+
+func (f *A2AFilter) handleGetTaskStatus(req *JSONRPCRequest) *JSONRPCResponse {
+       // Parse request parameters
+       paramsBytes, err := json.Marshal(req.Params)
+       if err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal 
get task status params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid parameters",
+                       },
+               }
+       }
+
+       var params GetTaskStatusRequest
+       if err := json.Unmarshal(paramsBytes, &params); err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse get 
task status params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid task status parameters",
+                       },
+               }
+       }
+
+       // Validate task ID
+       if params.TaskID == "" {
+               logger.Warnf("[dubbo-go-pixiu] A2A Server get task status 
missing task_id")
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Missing required field: task_id",
+                       },
+               }
+       }
+
+       // Get task from task manager
+       task, err := f.taskManager.GetTask(params.TaskID)
+       if err != nil {
+               logger.Warnf("[dubbo-go-pixiu] A2A Server task not found: %s", 
params.TaskID)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeTaskNotFound,
+                               Message: fmt.Sprintf("Task not found: %s", 
params.TaskID),
+                       },
+               }
+       }
+
+       logger.Infof("[dubbo-go-pixiu] A2A Server retrieved task status: %s 
(status: %s)", params.TaskID, task.Status)
+
+       // Build response
+       response := GetTaskStatusResponse{
+               Task: *task,
+       }
+
+       return &JSONRPCResponse{
+               JSONRPC: JSONRPCVersion,
+               ID:      req.ID,
+               Result:  response,
+       }
+}
+
+func (f *A2AFilter) handleUpdateTask(req *JSONRPCRequest) *JSONRPCResponse {
+       // Parse request parameters
+       paramsBytes, err := json.Marshal(req.Params)
+       if err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal 
update task params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid parameters",
+                       },
+               }
+       }
+
+       var params UpdateTaskRequest
+       if err := json.Unmarshal(paramsBytes, &params); err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse 
update task params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid update task parameters",
+                       },
+               }
+       }
+
+       // Validate task ID
+       if params.TaskID == "" {
+               logger.Warnf("[dubbo-go-pixiu] A2A Server update task missing 
task_id")
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Missing required field: task_id",
+                       },
+               }
+       }
+
+       // Update task
+       err = f.taskManager.UpdateTask(params.TaskID, params.Status, 
params.Result, params.Error)
+       if err != nil {
+               logger.Warnf("[dubbo-go-pixiu] A2A Server failed to update task 
%s: %v", params.TaskID, err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeTaskNotFound,
+                               Message: fmt.Sprintf("Failed to update task: 
%v", err),
+                       },
+               }
+       }
+
+       logger.Infof("[dubbo-go-pixiu] A2A Server updated task %s to status 
%s", params.TaskID, params.Status)
+
+       // Build response
+       response := UpdateTaskResponse{
+               Success: true,
+               Message: "Task updated successfully",
+       }
+
+       return &JSONRPCResponse{
+               JSONRPC: JSONRPCVersion,
+               ID:      req.ID,
+               Result:  response,
+       }
+}
+
+func (f *A2AFilter) handleCancelTask(req *JSONRPCRequest) *JSONRPCResponse {
+       // Parse request parameters
+       paramsBytes, err := json.Marshal(req.Params)
+       if err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal 
cancel task params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid parameters",
+                       },
+               }
+       }
+
+       var params struct {
+               TaskID string `json:"task_id"`
+       }
+       if err := json.Unmarshal(paramsBytes, &params); err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse 
cancel task params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid cancel task parameters",
+                       },
+               }
+       }
+
+       // Validate task ID
+       if params.TaskID == "" {
+               logger.Warnf("[dubbo-go-pixiu] A2A Server cancel task missing 
task_id")
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Missing required field: task_id",
+                       },
+               }
+       }
+
+       // Cancel task
+       err = f.taskManager.CancelTask(params.TaskID)
+       if err != nil {
+               logger.Warnf("[dubbo-go-pixiu] A2A Server failed to cancel task 
%s: %v", params.TaskID, err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeTaskNotFound,
+                               Message: fmt.Sprintf("Failed to cancel task: 
%v", err),
+                       },
+               }
+       }
+
+       logger.Infof("[dubbo-go-pixiu] A2A Server canceled task %s", 
params.TaskID)
+
+       // Build response
+       response := UpdateTaskResponse{
+               Success: true,
+               Message: "Task canceled successfully",
+       }
+
+       return &JSONRPCResponse{
+               JSONRPC: JSONRPCVersion,
+               ID:      req.ID,
+               Result:  response,
+       }
+}
+
+func (f *A2AFilter) handleSendMessage(req *JSONRPCRequest) *JSONRPCResponse {
+       // Parse request parameters
+       paramsBytes, err := json.Marshal(req.Params)
+       if err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal 
send message params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid parameters",
+                       },
+               }
+       }
+
+       var params SendMessageRequest
+       if err := json.Unmarshal(paramsBytes, &params); err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse send 
message params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid send message parameters",
+                       },
+               }
+       }
+
+       // Validate required fields
+       if params.To == "" || params.Type == "" || params.Content == nil {
+               logger.Warnf("[dubbo-go-pixiu] A2A Server send message missing 
required fields")
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Missing required fields: to, type, 
content",
+                       },
+               }
+       }
+
+       // Find target agent
+       f.mutex.RLock()
+       targetAgent, exists := f.agentMap[params.To]
+       agentInfo := f.cfg.GetDefaultAgentInfo()
+       f.mutex.RUnlock()
+
+       if !exists {
+               logger.Warnf("[dubbo-go-pixiu] A2A Server target agent not 
found: %s", params.To)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeAgentNotFound,
+                               Message: fmt.Sprintf("Agent not found: %s", 
params.To),
+                       },
+               }
+       }
+
+       // Generate message ID
+       messageID := fmt.Sprintf("msg_%d_%d", time.Now().UnixMilli(), 
time.Now().UnixNano()%1000)
+
+       // Build message
+       message := &Message{
+               MessageID: messageID,
+               From:      agentInfo.AgentID,
+               To:        params.To,
+               Type:      params.Type,
+               Content:   params.Content,
+               Timestamp: time.Now().UnixMilli(),
+       }
+
+       // Forward message to agent
+       err = f.forwardMessageToAgent(targetAgent, message)
+       if err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to forward 
message to %s: %v", params.To, err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInternalError,
+                               Message: fmt.Sprintf("Failed to send message: 
%v", err),
+                       },
+               }
+       }
+
+       logger.Infof("[dubbo-go-pixiu] A2A Server sent message %s to agent %s", 
messageID, params.To)
+
+       // Build response
+       response := SendMessageResponse{
+               MessageID: messageID,
+               Success:   true,
+               Message:   "Message sent successfully",
+       }
+
+       return &JSONRPCResponse{
+               JSONRPC: JSONRPCVersion,
+               ID:      req.ID,
+               Result:  response,
+       }
+}
+
+func (f *A2AFilter) handleBroadcastMessage(req *JSONRPCRequest) 
*JSONRPCResponse {
+       // Parse request parameters
+       paramsBytes, err := json.Marshal(req.Params)
+       if err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal 
broadcast message params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid parameters",
+                       },
+               }
+       }
+
+       var params SendMessageRequest
+       if err := json.Unmarshal(paramsBytes, &params); err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse 
broadcast message params: %v", err)
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Invalid broadcast message parameters",
+                       },
+               }
+       }
+
+       // Validate required fields
+       if params.Type == "" || params.Content == nil {
+               logger.Warnf("[dubbo-go-pixiu] A2A Server broadcast message 
missing required fields")
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Error: &RPCError{
+                               Code:    ErrorCodeInvalidParams,
+                               Message: "Missing required fields: type, 
content",
+                       },
+               }
+       }
+
+       // Get current agent info
+       f.mutex.RLock()
+       agentInfo := f.cfg.GetDefaultAgentInfo()
+
+       // Collect target agents (online agents excluding self)
+       var targetAgents []*AgentInfo
+       for _, agent := range f.agentMap {
+               if agent.AgentID != agentInfo.AgentID && agent.Status == 
StatusOnline {
+                       targetAgents = append(targetAgents, agent)
+               }
+       }
+       f.mutex.RUnlock()
+
+       totalAgents := len(targetAgents)
+       if totalAgents == 0 {
+               logger.Infof("[dubbo-go-pixiu] A2A Server no online agents to 
broadcast to")
+               return &JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      req.ID,
+                       Result: BroadcastMessageResponse{
+                               TotalAgents:  0,
+                               SuccessCount: 0,
+                               MessageIDs:   []string{},
+                       },
+               }
+       }
+
+       // Use channel to collect results
+       type result struct {
+               agentID   string
+               messageID string
+               err       error
+       }
+       resultCh := make(chan result, totalAgents)
+
+       // Broadcast to all target agents concurrently
+       for _, agent := range targetAgents {
+               go func(targetAgent *AgentInfo) {
+                       // Generate unique message ID
+                       messageID := fmt.Sprintf("msg_%d_%s_%d", 
time.Now().UnixMilli(), targetAgent.AgentID, time.Now().UnixNano()%1000)
+
+                       // Build message
+                       message := &Message{
+                               MessageID: messageID,
+                               From:      agentInfo.AgentID,
+                               To:        targetAgent.AgentID,
+                               Type:      params.Type,
+                               Content:   params.Content,
+                               Timestamp: time.Now().UnixMilli(),
+                       }
+
+                       // Forward message
+                       err := f.forwardMessageToAgent(targetAgent, message)
+                       resultCh <- result{
+                               agentID:   targetAgent.AgentID,
+                               messageID: messageID,
+                               err:       err,
+                       }
+               }(agent)
+       }
+
+       // Collect results
+       var successCount int
+       var failedAgents []string
+       var messageIDs []string
+
+       for i := 0; i < totalAgents; i++ {
+               res := <-resultCh
+               if res.err != nil {
+                       logger.Errorf("[dubbo-go-pixiu] A2A Server failed to 
broadcast to agent %s: %v", res.agentID, res.err)
+                       failedAgents = append(failedAgents, res.agentID)
+               } else {
+                       successCount++
+                       messageIDs = append(messageIDs, res.messageID)
+               }
+       }
+       close(resultCh)
+
+       logger.Infof("[dubbo-go-pixiu] A2A Server broadcast message to %d 
agents: %d succeeded, %d failed",
+               totalAgents, successCount, len(failedAgents))
+
+       // Build response
+       response := BroadcastMessageResponse{
+               TotalAgents:  totalAgents,
+               SuccessCount: successCount,
+               FailedAgents: failedAgents,
+               MessageIDs:   messageIDs,
+       }
+
+       return &JSONRPCResponse{
+               JSONRPC: JSONRPCVersion,
+               ID:      req.ID,
+               Result:  response,
+       }
+}
+
+// Utility methods
+
+// sendErrorResponse sends an error response to the client
+func (f *A2AFilter) sendErrorResponse(ctx *contexthttp.HttpContext, id any, 
code int, message string) {
+       response := &JSONRPCResponse{
+               JSONRPC: JSONRPCVersion,
+               ID:      id,
+               Error: &RPCError{
+                       Code:    code,
+                       Message: message,
+               },
+       }
+       f.sendJSONResponse(ctx, response)
+}
+
+// sendJSONResponse sends a JSON response to the client
+func (f *A2AFilter) sendJSONResponse(ctx *contexthttp.HttpContext, response 
*JSONRPCResponse) {
+       responseBody, err := json.Marshal(response)
+       if err != nil {
+               logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal 
response: %v", err)
+               ctx.SendLocalReply(500, []byte("Internal Server Error"))
+               return
+       }
+
+       // SendLocalReply automatically sets Content-Type to application/json 
for valid JSON
+       ctx.SendLocalReply(200, responseBody)
+}
+
+// forwardMessageToAgent forwards a message to a specific agent via HTTP
+func (f *A2AFilter) forwardMessageToAgent(agent *AgentInfo, message *Message) 
error {
+       // Build JSON-RPC request for receiving message
+       jsonrpcReq := JSONRPCRequest{
+               JSONRPC: JSONRPCVersion,
+               Method:  "a2a.receive_message",
+               Params:  message,
+               ID:      fmt.Sprintf("msg_%d", time.Now().UnixMilli()),
+       }
+
+       // Marshal request body
+       reqBody, err := json.Marshal(jsonrpcReq)
+       if err != nil {
+               return fmt.Errorf("failed to marshal message request: %v", err)
+       }
+
+       // Create HTTP request
+       httpReq, err := http.NewRequest("POST", agent.Endpoint, 
bytes.NewReader(reqBody))
+       if err != nil {
+               return fmt.Errorf("failed to create HTTP request: %v", err)
+       }
+
+       // Set headers
+       httpReq.Header.Set("Content-Type", "application/json")
+
+       // Create HTTP client with timeout
+       httpClient := &http.Client{
+               Timeout: 30 * time.Second,
+       }
+
+       // Send request
+       resp, err := httpClient.Do(httpReq)
+       if err != nil {
+               return fmt.Errorf("failed to send message to agent %s: %v", 
agent.AgentID, err)
+       }
+       defer resp.Body.Close()
+
+       // Check response status
+       if resp.StatusCode != 200 {
+               bodyBytes, _ := io.ReadAll(resp.Body)
+               return fmt.Errorf("agent %s returned status %d: %s", 
agent.AgentID, resp.StatusCode, string(bodyBytes))
+       }
+
+       // Parse response
+       var jsonrpcResp JSONRPCResponse
+       if err := json.NewDecoder(resp.Body).Decode(&jsonrpcResp); err != nil {
+               return fmt.Errorf("failed to parse response from agent %s: %v", 
agent.AgentID, err)
+       }
+
+       // Check for JSON-RPC error
+       if jsonrpcResp.Error != nil {
+               return fmt.Errorf("agent %s returned error: %s", agent.AgentID, 
jsonrpcResp.Error.Message)
+       }
+
+       logger.Infof("[dubbo-go-pixiu] A2A Server successfully forwarded 
message %s to agent %s", message.MessageID, agent.AgentID)
+       return nil
+}
diff --git a/pkg/filter/a2a/filter_test.go b/pkg/filter/a2a/filter_test.go
new file mode 100644
index 00000000..38f67a6c
--- /dev/null
+++ b/pkg/filter/a2a/filter_test.go
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+       "bytes"
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+       "testing"
+)
+
+import (
+       "github.com/stretchr/testify/assert"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+       "github.com/apache/dubbo-go-pixiu/pkg/context/mock"
+)
+
+// createTestConfig creates a test configuration
+func createTestConfig() *Config {
+       return &Config{
+               Endpoint: "/a2a",
+               AgentInfo: &AgentConfig{
+                       AgentID:     "test-agent",
+                       Name:        "Test Agent",
+                       Version:     "1.0.0",
+                       Description: "Test agent for unit tests",
+                       Endpoint:    "http://localhost:8888/a2a";,
+                       Status:      StatusOnline,
+                       Capabilities: []CapabilityConfig{
+                               {
+                                       Name:        "test_capability",
+                                       Description: "Test capability",
+                                       Tags:        []string{"test"},
+                               },
+                       },
+               },
+               KnownAgents: []AgentConfig{
+                       {
+                               AgentID:  "agent-1",
+                               Name:     "Agent 1",
+                               Version:  "1.0.0",
+                               Endpoint: "http://localhost:9001/a2a";,
+                               Status:   StatusOnline,
+                       },
+               },
+               TaskConfig: &TaskConfig{
+                       DefaultTimeout:     5000,
+                       MaxConcurrentTasks: 10,
+                       CleanupInterval:    1000,
+                       TaskRetention:      5000,
+               },
+       }
+}
+
+// createTestFilterFactory creates a filter factory for testing
+func createTestFilterFactory() *FilterFactory {
+       factory := &FilterFactory{
+               cfg: createTestConfig(),
+       }
+       factory.Apply()
+       return factory
+}
+
+// buildJSONRPCRequest builds a JSON-RPC request body
+func buildJSONRPCRequest(method string, params any) []byte {
+       req := JSONRPCRequest{
+               JSONRPC: JSONRPCVersion,
+               Method:  method,
+               Params:  params,
+               ID:      1,
+       }
+       body, _ := json.Marshal(req)
+       return body
+}
+
+// TestFilterFactory_Apply tests filter factory initialization
+func TestFilterFactory_Apply(t *testing.T) {
+       factory := createTestFilterFactory()
+
+       assert.NotNil(t, factory.agentMap)
+       assert.NotNil(t, factory.taskManager)
+       assert.Equal(t, 2, len(factory.agentMap)) // test-agent + agent-1
+}
+
+// TestA2AFilter_HandlePing tests the ping method
+func TestA2AFilter_HandlePing(t *testing.T) {
+       factory := createTestFilterFactory()
+
+       reqBody := buildJSONRPCRequest(MethodPing, nil)
+       request, err := http.NewRequest("POST", "http://localhost:8888/a2a";, 
bytes.NewReader(reqBody))
+       assert.NoError(t, err)
+       request.Header.Set("Content-Type", "application/json")
+
+       ctx := mock.GetMockHTTPContext(request)
+       f := &A2AFilter{
+               cfg:         factory.cfg,
+               agentMap:    factory.agentMap,
+               taskManager: factory.taskManager,
+               mutex:       &factory.mutex,
+       }
+
+       status := f.Decode(ctx)
+       assert.Equal(t, filter.Stop, status)
+
+       // Verify response was written (we can't easily check the actual 
response in this test setup)
+       // In a real scenario, you'd inspect ctx.Writer or use 
httptest.ResponseRecorder
+}
+
+// TestA2AFilter_HandleGetAgentCard tests the get_agent_card method
+func TestA2AFilter_HandleGetAgentCard(t *testing.T) {
+       factory := createTestFilterFactory()
+
+       reqBody := buildJSONRPCRequest(MethodGetAgentCard, nil)
+       request, err := http.NewRequest("POST", "http://localhost:8888/a2a";, 
bytes.NewReader(reqBody))
+       assert.NoError(t, err)
+       request.Header.Set("Content-Type", "application/json")
+
+       ctx := mock.GetMockHTTPContext(request)
+       f := &A2AFilter{
+               cfg:         factory.cfg,
+               agentMap:    factory.agentMap,
+               taskManager: factory.taskManager,
+               mutex:       &factory.mutex,
+       }
+
+       status := f.Decode(ctx)
+       assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_HandleDiscoverAgents tests the discover_agents method
+func TestA2AFilter_HandleDiscoverAgents(t *testing.T) {
+       factory := createTestFilterFactory()
+
+       reqBody := buildJSONRPCRequest(MethodDiscoverAgents, nil)
+       request, err := http.NewRequest("POST", "http://localhost:8888/a2a";, 
bytes.NewReader(reqBody))
+       assert.NoError(t, err)
+       request.Header.Set("Content-Type", "application/json")
+
+       ctx := mock.GetMockHTTPContext(request)
+       f := &A2AFilter{
+               cfg:         factory.cfg,
+               agentMap:    factory.agentMap,
+               taskManager: factory.taskManager,
+               mutex:       &factory.mutex,
+       }
+
+       status := f.Decode(ctx)
+       assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_HandleCreateTask tests task creation
+func TestA2AFilter_HandleCreateTask(t *testing.T) {
+       factory := createTestFilterFactory()
+
+       params := CreateTaskRequest{
+               To:   "agent-1",
+               Type: "test_task",
+               Content: map[string]any{
+                       "message": "test",
+               },
+               Timeout: 3000,
+       }
+
+       reqBody := buildJSONRPCRequest(MethodCreateTask, params)
+       request, err := http.NewRequest("POST", "http://localhost:8888/a2a";, 
bytes.NewReader(reqBody))
+       assert.NoError(t, err)
+       request.Header.Set("Content-Type", "application/json")
+
+       ctx := mock.GetMockHTTPContext(request)
+       f := &A2AFilter{
+               cfg:         factory.cfg,
+               agentMap:    factory.agentMap,
+               taskManager: factory.taskManager,
+               mutex:       &factory.mutex,
+       }
+
+       status := f.Decode(ctx)
+       assert.Equal(t, filter.Stop, status)
+
+       // Verify task was created
+       assert.Equal(t, 1, factory.taskManager.GetTaskCount())
+}
+
+// TestA2AFilter_HandleCreateTask_MissingParams tests task creation with 
missing parameters
+func TestA2AFilter_HandleCreateTask_MissingParams(t *testing.T) {
+       factory := createTestFilterFactory()
+
+       // Missing required fields
+       params := CreateTaskRequest{
+               To: "agent-1",
+               // Missing Type and Content
+       }
+
+       reqBody := buildJSONRPCRequest(MethodCreateTask, params)
+       request, err := http.NewRequest("POST", "http://localhost:8888/a2a";, 
bytes.NewReader(reqBody))
+       assert.NoError(t, err)
+       request.Header.Set("Content-Type", "application/json")
+
+       ctx := mock.GetMockHTTPContext(request)
+       f := &A2AFilter{
+               cfg:         factory.cfg,
+               agentMap:    factory.agentMap,
+               taskManager: factory.taskManager,
+               mutex:       &factory.mutex,
+       }
+
+       status := f.Decode(ctx)
+       assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_HandleSendMessage tests sending a message with mock server
+func TestA2AFilter_HandleSendMessage(t *testing.T) {
+       // Create a mock agent server
+       mockServer := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               // Simulate agent receiving message
+               response := JSONRPCResponse{
+                       JSONRPC: JSONRPCVersion,
+                       ID:      1,
+                       Result:  map[string]string{"status": "received"},
+               }
+               json.NewEncoder(w).Encode(response)
+       }))
+       defer mockServer.Close()
+
+       // Create factory with mock server endpoint
+       factory := createTestFilterFactory()
+       factory.agentMap["agent-1"].Endpoint = mockServer.URL
+
+       params := SendMessageRequest{
+               To:   "agent-1",
+               Type: "test_message",
+               Content: map[string]any{
+                       "text": "Hello, agent!",
+               },
+       }
+
+       reqBody := buildJSONRPCRequest(MethodSendMessage, params)
+       request, err := http.NewRequest("POST", "http://localhost:8888/a2a";, 
bytes.NewReader(reqBody))
+       assert.NoError(t, err)
+       request.Header.Set("Content-Type", "application/json")
+
+       ctx := mock.GetMockHTTPContext(request)
+       f := &A2AFilter{
+               cfg:         factory.cfg,
+               agentMap:    factory.agentMap,
+               taskManager: factory.taskManager,
+               mutex:       &factory.mutex,
+       }
+
+       status := f.Decode(ctx)
+       assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_HandleSendMessage_AgentNotFound tests sending message to 
non-existent agent
+func TestA2AFilter_HandleSendMessage_AgentNotFound(t *testing.T) {
+       factory := createTestFilterFactory()
+
+       params := SendMessageRequest{
+               To:   "non-existent-agent",
+               Type: "test_message",
+               Content: map[string]any{
+                       "text": "Hello",
+               },
+       }
+
+       reqBody := buildJSONRPCRequest(MethodSendMessage, params)
+       request, err := http.NewRequest("POST", "http://localhost:8888/a2a";, 
bytes.NewReader(reqBody))
+       assert.NoError(t, err)
+       request.Header.Set("Content-Type", "application/json")
+
+       ctx := mock.GetMockHTTPContext(request)
+       f := &A2AFilter{
+               cfg:         factory.cfg,
+               agentMap:    factory.agentMap,
+               taskManager: factory.taskManager,
+               mutex:       &factory.mutex,
+       }
+
+       status := f.Decode(ctx)
+       assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_NonA2ARequest tests that non-A2A requests are ignored
+func TestA2AFilter_NonA2ARequest(t *testing.T) {
+       factory := createTestFilterFactory()
+
+       request, err := http.NewRequest("GET", 
"http://localhost:8888/other-path";, nil)
+       assert.NoError(t, err)
+
+       ctx := mock.GetMockHTTPContext(request)
+       f := &A2AFilter{
+               cfg:         factory.cfg,
+               agentMap:    factory.agentMap,
+               taskManager: factory.taskManager,
+               mutex:       &factory.mutex,
+       }
+
+       status := f.Decode(ctx)
+       assert.Equal(t, filter.Continue, status)
+}
+
+// TestA2AFilter_InvalidJSON tests handling of invalid JSON
+func TestA2AFilter_InvalidJSON(t *testing.T) {
+       factory := createTestFilterFactory()
+
+       request, err := http.NewRequest("POST", "http://localhost:8888/a2a";, 
bytes.NewReader([]byte("invalid json")))
+       assert.NoError(t, err)
+       request.Header.Set("Content-Type", "application/json")
+
+       ctx := mock.GetMockHTTPContext(request)
+       f := &A2AFilter{
+               cfg:         factory.cfg,
+               agentMap:    factory.agentMap,
+               taskManager: factory.taskManager,
+               mutex:       &factory.mutex,
+       }
+
+       status := f.Decode(ctx)
+       assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_UnknownMethod tests handling of unknown methods
+func TestA2AFilter_UnknownMethod(t *testing.T) {
+       factory := createTestFilterFactory()
+
+       reqBody := buildJSONRPCRequest("a2a.unknown_method", nil)
+       request, err := http.NewRequest("POST", "http://localhost:8888/a2a";, 
bytes.NewReader(reqBody))
+       assert.NoError(t, err)
+       request.Header.Set("Content-Type", "application/json")
+
+       ctx := mock.GetMockHTTPContext(request)
+       f := &A2AFilter{
+               cfg:         factory.cfg,
+               agentMap:    factory.agentMap,
+               taskManager: factory.taskManager,
+               mutex:       &factory.mutex,
+       }
+
+       status := f.Decode(ctx)
+       assert.Equal(t, filter.Stop, status)
+}
diff --git a/pkg/filter/a2a/plugin.go b/pkg/filter/a2a/plugin.go
new file mode 100644
index 00000000..651df514
--- /dev/null
+++ b/pkg/filter/a2a/plugin.go
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+)
+
+func init() {
+       filter.RegisterHttpFilter(&Plugin{})
+}
+
+// Plugin implements filter.HttpFilterPlugin interface for A2A Server Filter
+type Plugin struct{}
+
+// Kind returns the plugin type identifier
+func (p *Plugin) Kind() string {
+       return Kind
+}
+
+// CreateFilterFactory creates a new FilterFactory instance
+func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) {
+       return &FilterFactory{cfg: &Config{}}, nil
+}
diff --git a/pkg/filter/a2a/task_manager.go b/pkg/filter/a2a/task_manager.go
new file mode 100644
index 00000000..f6be133e
--- /dev/null
+++ b/pkg/filter/a2a/task_manager.go
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+       "fmt"
+       "sync"
+       "time"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// TaskManager manages tasks in memory (simplified implementation for phase 1)
+type TaskManager struct {
+       config        *TaskConfig
+       tasks         map[string]*Task
+       mutex         sync.RWMutex
+       idCounter     int64
+       stopCh        chan struct{}
+       cleanupTicker *time.Ticker
+}
+
+// NewTaskManager creates a new task manager instance
+func NewTaskManager(config *TaskConfig) *TaskManager {
+       tm := &TaskManager{
+               config:    config,
+               tasks:     make(map[string]*Task),
+               idCounter: 1,
+               stopCh:    make(chan struct{}),
+       }
+
+       // Start cleanup routine if cleanup interval is configured
+       if config.CleanupInterval > 0 {
+               tm.cleanupTicker = 
time.NewTicker(time.Duration(config.CleanupInterval) * time.Millisecond)
+               go tm.cleanupRoutine()
+               logger.Infof("[dubbo-go-pixiu] A2A TaskManager cleanup routine 
started with interval %d ms", config.CleanupInterval)
+       }
+
+       return tm
+}
+
+// CreateTask creates a new task
+func (tm *TaskManager) CreateTask(from, to, taskType string, content 
map[string]any, timeout int64) (*Task, error) {
+       tm.mutex.Lock()
+       defer tm.mutex.Unlock()
+
+       // Check concurrent task limit
+       if len(tm.tasks) >= tm.config.MaxConcurrentTasks {
+               return nil, fmt.Errorf("maximum concurrent tasks limit reached: 
%d", tm.config.MaxConcurrentTasks)
+       }
+
+       // Generate task ID
+       taskID := fmt.Sprintf("task_%d_%d", time.Now().UnixMilli(), 
tm.idCounter)
+       tm.idCounter++
+
+       // Use default timeout if not specified
+       if timeout <= 0 {
+               timeout = tm.config.DefaultTimeout
+       }
+
+       now := time.Now().UnixMilli()
+       task := &Task{
+               TaskID:    taskID,
+               From:      from,
+               To:        to,
+               Type:      taskType,
+               Content:   content,
+               Status:    TaskPending,
+               CreatedAt: now,
+               UpdatedAt: now,
+               Timeout:   timeout,
+       }
+
+       tm.tasks[taskID] = task
+       logger.Infof("[dubbo-go-pixiu] A2A TaskManager created task %s: %s -> 
%s (%s)", taskID, from, to, taskType)
+
+       return task, nil
+}
+
+// GetTask retrieves a task by ID
+func (tm *TaskManager) GetTask(taskID string) (*Task, error) {
+       tm.mutex.RLock()
+       defer tm.mutex.RUnlock()
+
+       task, exists := tm.tasks[taskID]
+       if !exists {
+               return nil, fmt.Errorf("task not found: %s", taskID)
+       }
+
+       // Return a copy to prevent external modification
+       taskCopy := *task
+       return &taskCopy, nil
+}
+
+// UpdateTask updates task status and result
+func (tm *TaskManager) UpdateTask(taskID string, status TaskStatus, result 
map[string]any, errorMsg string) error {
+       tm.mutex.Lock()
+       defer tm.mutex.Unlock()
+
+       task, exists := tm.tasks[taskID]
+       if !exists {
+               return fmt.Errorf("task not found: %s", taskID)
+       }
+
+       // Update task fields
+       task.Status = status
+       task.UpdatedAt = time.Now().UnixMilli()
+
+       if result != nil {
+               task.Result = result
+       }
+
+       if errorMsg != "" {
+               task.Error = errorMsg
+       }
+
+       logger.Infof("[dubbo-go-pixiu] A2A TaskManager updated task %s status 
to %s", taskID, status)
+       return nil
+}
+
+// CancelTask cancels a task
+func (tm *TaskManager) CancelTask(taskID string) error {
+       return tm.UpdateTask(taskID, TaskCancelled, nil, "Task canceled")
+}
+
+// ListTasks returns all tasks (for debugging purposes)
+func (tm *TaskManager) ListTasks() map[string]*Task {
+       tm.mutex.RLock()
+       defer tm.mutex.RUnlock()
+
+       // Return copies to prevent external modification
+       result := make(map[string]*Task, len(tm.tasks))
+       for id, task := range tm.tasks {
+               taskCopy := *task
+               result[id] = &taskCopy
+       }
+
+       return result
+}
+
+// GetTaskCount returns the current number of tasks
+func (tm *TaskManager) GetTaskCount() int {
+       tm.mutex.RLock()
+       defer tm.mutex.RUnlock()
+       return len(tm.tasks)
+}
+
+// Stop stops the task manager and cleanup routines
+func (tm *TaskManager) Stop() {
+       if tm.cleanupTicker != nil {
+               tm.cleanupTicker.Stop()
+       }
+       close(tm.stopCh)
+       logger.Infof("[dubbo-go-pixiu] A2A TaskManager stopped")
+}
+
+// cleanupRoutine periodically cleans up completed and expired tasks
+func (tm *TaskManager) cleanupRoutine() {
+       for {
+               select {
+               case <-tm.cleanupTicker.C:
+                       tm.cleanupExpiredTasks()
+               case <-tm.stopCh:
+                       return
+               }
+       }
+}
+
+// cleanupExpiredTasks removes expired and old completed tasks
+func (tm *TaskManager) cleanupExpiredTasks() {
+       tm.mutex.Lock()
+       defer tm.mutex.Unlock()
+
+       now := time.Now().UnixMilli()
+       var toDelete []string
+
+       for taskID, task := range tm.tasks {
+               shouldDelete := false
+
+               // Check if task is expired (timeout exceeded)
+               if task.Status == TaskPending || task.Status == TaskRunning {
+                       if now-task.CreatedAt > task.Timeout {
+                               task.Status = TaskFailed
+                               task.Error = "Task timeout"
+                               task.UpdatedAt = now
+                               logger.Warnf("[dubbo-go-pixiu] A2A TaskManager 
task %s timed out", taskID)
+                       }
+               }
+
+               // Check if completed task should be cleaned up
+               if task.Status == TaskCompleted || task.Status == TaskFailed || 
task.Status == TaskCancelled {
+                       if now-task.UpdatedAt > tm.config.TaskRetention {
+                               shouldDelete = true
+                       }
+               }
+
+               if shouldDelete {
+                       toDelete = append(toDelete, taskID)
+               }
+       }
+
+       // Delete expired tasks
+       for _, taskID := range toDelete {
+               delete(tm.tasks, taskID)
+               logger.Debugf("[dubbo-go-pixiu] A2A TaskManager cleaned up task 
%s", taskID)
+       }
+
+       if len(toDelete) > 0 {
+               logger.Infof("[dubbo-go-pixiu] A2A TaskManager cleaned up %d 
expired tasks", len(toDelete))
+       }
+}
diff --git a/pkg/filter/a2a/task_manager_test.go 
b/pkg/filter/a2a/task_manager_test.go
new file mode 100644
index 00000000..06e8d802
--- /dev/null
+++ b/pkg/filter/a2a/task_manager_test.go
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+       "testing"
+       "time"
+)
+
+import (
+       "github.com/stretchr/testify/assert"
+)
+
+// TestTaskManager_CreateAndGet tests basic task creation and retrieval
+func TestTaskManager_CreateAndGet(t *testing.T) {
+       config := &TaskConfig{
+               DefaultTimeout:     5000,
+               MaxConcurrentTasks: 10,
+               CleanupInterval:    0, // Disable cleanup for this test
+       }
+
+       tm := NewTaskManager(config)
+       defer tm.Stop()
+
+       // Create a task
+       task, err := tm.CreateTask("agent-1", "agent-2", "test_task", 
map[string]any{
+               "message": "test",
+       }, 3000)
+
+       assert.NoError(t, err)
+       assert.NotNil(t, task)
+       assert.Equal(t, "agent-1", task.From)
+       assert.Equal(t, "agent-2", task.To)
+       assert.Equal(t, "test_task", task.Type)
+       assert.Equal(t, TaskPending, task.Status)
+
+       // Get the task
+       retrievedTask, err := tm.GetTask(task.TaskID)
+       assert.NoError(t, err)
+       assert.Equal(t, task.TaskID, retrievedTask.TaskID)
+       assert.Equal(t, task.From, retrievedTask.From)
+}
+
+// TestTaskManager_UpdateTask tests task updates
+func TestTaskManager_UpdateTask(t *testing.T) {
+       config := &TaskConfig{
+               DefaultTimeout:     5000,
+               MaxConcurrentTasks: 10,
+               CleanupInterval:    0,
+       }
+
+       tm := NewTaskManager(config)
+       defer tm.Stop()
+
+       // Create a task
+       task, err := tm.CreateTask("agent-1", "agent-2", "test_task", 
map[string]any{
+               "data": "test",
+       }, 0)
+       assert.NoError(t, err)
+
+       // Update the task
+       result := map[string]any{
+               "output": "success",
+       }
+       err = tm.UpdateTask(task.TaskID, TaskCompleted, result, "")
+       assert.NoError(t, err)
+
+       // Verify update
+       updatedTask, err := tm.GetTask(task.TaskID)
+       assert.NoError(t, err)
+       assert.Equal(t, TaskCompleted, updatedTask.Status)
+       assert.NotNil(t, updatedTask.Result)
+       assert.Equal(t, "success", updatedTask.Result["output"])
+}
+
+// TestTaskManager_CancelTask tests task cancellation
+func TestTaskManager_CancelTask(t *testing.T) {
+       config := &TaskConfig{
+               DefaultTimeout:     5000,
+               MaxConcurrentTasks: 10,
+               CleanupInterval:    0,
+       }
+
+       tm := NewTaskManager(config)
+       defer tm.Stop()
+
+       // Create a task
+       task, err := tm.CreateTask("agent-1", "agent-2", "test_task", 
map[string]any{}, 0)
+       assert.NoError(t, err)
+
+       // Cancel the task
+       err = tm.CancelTask(task.TaskID)
+       assert.NoError(t, err)
+
+       // Verify cancellation
+       cancelledTask, err := tm.GetTask(task.TaskID)
+       assert.NoError(t, err)
+       assert.Equal(t, TaskCancelled, cancelledTask.Status)
+}
+
+// TestTaskManager_ConcurrentLimit tests the concurrent task limit
+func TestTaskManager_ConcurrentLimit(t *testing.T) {
+       config := &TaskConfig{
+               DefaultTimeout:     5000,
+               MaxConcurrentTasks: 3, // Small limit for testing
+               CleanupInterval:    0,
+       }
+
+       tm := NewTaskManager(config)
+       defer tm.Stop()
+
+       // Create tasks up to the limit
+       for i := 0; i < 3; i++ {
+               _, err := tm.CreateTask("agent-1", "agent-2", "test_task", 
map[string]any{}, 0)
+               assert.NoError(t, err)
+       }
+
+       // Try to create one more task (should fail)
+       _, err := tm.CreateTask("agent-1", "agent-2", "test_task", 
map[string]any{}, 0)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "maximum concurrent tasks limit 
reached")
+}
+
+// TestTaskManager_GetTaskNotFound tests getting a non-existent task
+func TestTaskManager_GetTaskNotFound(t *testing.T) {
+       config := &TaskConfig{
+               DefaultTimeout:     5000,
+               MaxConcurrentTasks: 10,
+               CleanupInterval:    0,
+       }
+
+       tm := NewTaskManager(config)
+       defer tm.Stop()
+
+       // Try to get a non-existent task
+       _, err := tm.GetTask("non-existent-task-id")
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "task not found")
+}
+
+// TestTaskManager_ListTasks tests listing all tasks
+func TestTaskManager_ListTasks(t *testing.T) {
+       config := &TaskConfig{
+               DefaultTimeout:     5000,
+               MaxConcurrentTasks: 10,
+               CleanupInterval:    0,
+       }
+
+       tm := NewTaskManager(config)
+       defer tm.Stop()
+
+       // Create multiple tasks
+       task1, _ := tm.CreateTask("agent-1", "agent-2", "task1", 
map[string]any{}, 0)
+       task2, _ := tm.CreateTask("agent-1", "agent-3", "task2", 
map[string]any{}, 0)
+
+       // List all tasks
+       tasks := tm.ListTasks()
+       assert.Equal(t, 2, len(tasks))
+       assert.NotNil(t, tasks[task1.TaskID])
+       assert.NotNil(t, tasks[task2.TaskID])
+}
+
+// TestTaskManager_Cleanup tests the cleanup mechanism
+func TestTaskManager_Cleanup(t *testing.T) {
+       config := &TaskConfig{
+               DefaultTimeout:     100, // Very short timeout
+               MaxConcurrentTasks: 10,
+               CleanupInterval:    50,  // Frequent cleanup
+               TaskRetention:      100, // Short retention
+       }
+
+       tm := NewTaskManager(config)
+       defer tm.Stop()
+
+       // Create a task
+       task, err := tm.CreateTask("agent-1", "agent-2", "test_task", 
map[string]any{}, 100)
+       assert.NoError(t, err)
+
+       // Wait for timeout
+       time.Sleep(150 * time.Millisecond)
+
+       // Manually trigger cleanup
+       tm.cleanupExpiredTasks()
+
+       // Task should be marked as failed due to timeout
+       timedOutTask, err := tm.GetTask(task.TaskID)
+       assert.NoError(t, err)
+       assert.Equal(t, TaskFailed, timedOutTask.Status)
+       assert.Contains(t, timedOutTask.Error, "timeout")
+}
+
+// TestTaskManager_GetTaskCount tests the task count functionality
+func TestTaskManager_GetTaskCount(t *testing.T) {
+       config := &TaskConfig{
+               DefaultTimeout:     5000,
+               MaxConcurrentTasks: 10,
+               CleanupInterval:    0,
+       }
+
+       tm := NewTaskManager(config)
+       defer tm.Stop()
+
+       assert.Equal(t, 0, tm.GetTaskCount())
+
+       // Create tasks
+       tm.CreateTask("agent-1", "agent-2", "task1", map[string]any{}, 0)
+       assert.Equal(t, 1, tm.GetTaskCount())
+
+       tm.CreateTask("agent-1", "agent-3", "task2", map[string]any{}, 0)
+       assert.Equal(t, 2, tm.GetTaskCount())
+}
diff --git a/pkg/filter/a2a/types.go b/pkg/filter/a2a/types.go
new file mode 100644
index 00000000..a2aefeab
--- /dev/null
+++ b/pkg/filter/a2a/types.go
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+// JSONRPCRequest represents a JSON-RPC 2.0 request
+type JSONRPCRequest struct {
+       JSONRPC string `json:"jsonrpc"`
+       Method  string `json:"method"`
+       Params  any    `json:"params,omitempty"`
+       ID      any    `json:"id"`
+}
+
+// JSONRPCResponse represents a JSON-RPC 2.0 response
+type JSONRPCResponse struct {
+       JSONRPC string    `json:"jsonrpc"`
+       Result  any       `json:"result,omitempty"`
+       Error   *RPCError `json:"error,omitempty"`
+       ID      any       `json:"id"`
+}
+
+// RPCError represents a JSON-RPC error object
+type RPCError struct {
+       Code    int    `json:"code"`
+       Message string `json:"message"`
+       Data    any    `json:"data,omitempty"`
+}
+
+// AgentStatus represents the status of an agent
+type AgentStatus string
+
+const (
+       StatusOnline  AgentStatus = AgentStatusOnline
+       StatusOffline AgentStatus = AgentStatusOffline
+       StatusBusy    AgentStatus = AgentStatusBusy
+)
+
+// TaskStatus represents the status of a task
+type TaskStatus string
+
+const (
+       TaskPending   TaskStatus = TaskStatusPending
+       TaskRunning   TaskStatus = TaskStatusRunning
+       TaskCompleted TaskStatus = TaskStatusCompleted
+       TaskFailed    TaskStatus = TaskStatusFailed
+       TaskCancelled TaskStatus = TaskStatusCancelled
+)
+
+// AgentInfo represents information about an agent
+type AgentInfo struct {
+       AgentID      string         `json:"agent_id"`
+       Name         string         `json:"name"`
+       Version      string         `json:"version"`
+       Description  string         `json:"description,omitempty"`
+       Endpoint     string         `json:"endpoint"`
+       Status       AgentStatus    `json:"status"`
+       Capabilities []Capability   `json:"capabilities,omitempty"`
+       Metadata     map[string]any `json:"metadata,omitempty"`
+}
+
+// Capability represents a capability that an agent provides
+type Capability struct {
+       Name        string      `json:"name"`
+       Description string      `json:"description,omitempty"`
+       InputTypes  []string    `json:"input_types,omitempty"`
+       OutputTypes []string    `json:"output_types,omitempty"`
+       Tags        []string    `json:"tags,omitempty"`
+       Parameters  []Parameter `json:"parameters,omitempty"`
+}
+
+// Parameter represents a capability parameter
+type Parameter struct {
+       Name        string `json:"name"`
+       Type        string `json:"type"`
+       Description string `json:"description,omitempty"`
+       Required    bool   `json:"required"`
+       Default     any    `json:"default,omitempty"`
+}
+
+// Task represents a task that can be executed by an agent
+type Task struct {
+       TaskID    string         `json:"task_id"`
+       From      string         `json:"from"`
+       To        string         `json:"to"`
+       Type      string         `json:"type"`
+       Content   map[string]any `json:"content"`
+       Status    TaskStatus     `json:"status"`
+       Result    map[string]any `json:"result,omitempty"`
+       Error     string         `json:"error,omitempty"`
+       CreatedAt int64          `json:"created_at"`
+       UpdatedAt int64          `json:"updated_at"`
+       Timeout   int64          `json:"timeout,omitempty"`
+}
+
+// Message represents a message sent between agents
+type Message struct {
+       MessageID string         `json:"message_id"`
+       From      string         `json:"from"`
+       To        string         `json:"to"`
+       Type      string         `json:"type"`
+       Content   map[string]any `json:"content"`
+       Timestamp int64          `json:"timestamp"`
+}
+
+// PingRequest represents a ping request
+type PingRequest struct {
+       Timestamp int64 `json:"timestamp,omitempty"`
+}
+
+// PingResponse represents a ping response
+type PingResponse struct {
+       Status    string `json:"status"`
+       AgentID   string `json:"agent_id"`
+       Timestamp int64  `json:"timestamp"`
+}
+
+// DiscoverAgentsRequest represents a request to discover agents
+type DiscoverAgentsRequest struct {
+       Capabilities []string          `json:"capabilities,omitempty"`
+       Tags         []string          `json:"tags,omitempty"`
+       Status       AgentStatus       `json:"status,omitempty"`
+       Filters      map[string]string `json:"filters,omitempty"`
+}
+
+// DiscoverAgentsResponse represents the response to an agent discovery request
+type DiscoverAgentsResponse struct {
+       Agents []AgentInfo `json:"agents"`
+}
+
+// CreateTaskRequest represents a request to create a task
+type CreateTaskRequest struct {
+       To      string         `json:"to"`
+       Type    string         `json:"type"`
+       Content map[string]any `json:"content"`
+       Timeout int64          `json:"timeout,omitempty"`
+}
+
+// CreateTaskResponse represents the response to a create task request
+type CreateTaskResponse struct {
+       TaskID string     `json:"task_id"`
+       Status TaskStatus `json:"status"`
+}
+
+// GetTaskStatusRequest represents a request to get task status
+type GetTaskStatusRequest struct {
+       TaskID string `json:"task_id"`
+}
+
+// GetTaskStatusResponse represents the response to a get task status request
+type GetTaskStatusResponse struct {
+       Task Task `json:"task"`
+}
+
+// UpdateTaskRequest represents a request to update a task
+type UpdateTaskRequest struct {
+       TaskID string         `json:"task_id"`
+       Status TaskStatus     `json:"status,omitempty"`
+       Result map[string]any `json:"result,omitempty"`
+       Error  string         `json:"error,omitempty"`
+}
+
+// UpdateTaskResponse represents the response to an update task request
+type UpdateTaskResponse struct {
+       Success bool   `json:"success"`
+       Message string `json:"message,omitempty"`
+}
+
+// SendMessageRequest represents a request to send a message
+type SendMessageRequest struct {
+       To      string         `json:"to"`
+       Type    string         `json:"type"`
+       Content map[string]any `json:"content"`
+}
+
+// SendMessageResponse represents the response to a send message request
+type SendMessageResponse struct {
+       MessageID string `json:"message_id"`
+       Success   bool   `json:"success"`
+       Message   string `json:"message,omitempty"`
+}
+
+// BroadcastMessageResponse represents the response to a broadcast message 
request
+type BroadcastMessageResponse struct {
+       TotalAgents  int      `json:"total_agents"`
+       SuccessCount int      `json:"success_count"`
+       FailedAgents []string `json:"failed_agents,omitempty"`
+       MessageIDs   []string `json:"message_ids"`
+}
diff --git a/pkg/pluginregistry/registry.go b/pkg/pluginregistry/registry.go
index 7fc267ea..2c7a6070 100644
--- a/pkg/pluginregistry/registry.go
+++ b/pkg/pluginregistry/registry.go
@@ -29,6 +29,7 @@ import (
        _ "github.com/apache/dubbo-go-pixiu/pkg/cluster/retry/countbased"
        _ 
"github.com/apache/dubbo-go-pixiu/pkg/cluster/retry/exponentialbackoff"
        _ "github.com/apache/dubbo-go-pixiu/pkg/cluster/retry/noretry"
+       _ "github.com/apache/dubbo-go-pixiu/pkg/filter/a2a"
        _ "github.com/apache/dubbo-go-pixiu/pkg/filter/accesslog"
        _ "github.com/apache/dubbo-go-pixiu/pkg/filter/auth/jwt"
        _ "github.com/apache/dubbo-go-pixiu/pkg/filter/auth/mcp"

Reply via email to