This is an automated email from the ASF dual-hosted git repository.
wuxinfan pushed a commit to branch a2a-2025
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/a2a-2025 by this push:
new 78989b19 feat(a2a): implement A2A Server Filter (#771)
78989b19 is described below
commit 78989b1914fb3a3d96f847c653368590b2a39534
Author: Zerui Yang <[email protected]>
AuthorDate: Thu Oct 30 15:02:52 2025 +0800
feat(a2a): implement A2A Server Filter (#771)
* feat(a2a): implement A2A Server Filter with configuration and task
management
* refactor(filter): reorganize import statements for clarity
* fix: fix the err
* feat(filter): add A2A task and message management with tests
* fix: standardize cancellation terminology from "cancelled" to "canceled"
* fix: standardize cancellation terminology from "cancelled" to "canceled"
---
pkg/common/constant/key.go | 3 +
pkg/filter/a2a/config.go | 195 ++++++++
pkg/filter/a2a/constants.go | 82 ++++
pkg/filter/a2a/filter.go | 856 ++++++++++++++++++++++++++++++++++++
pkg/filter/a2a/filter_test.go | 360 +++++++++++++++
pkg/filter/a2a/plugin.go | 39 ++
pkg/filter/a2a/task_manager.go | 228 ++++++++++
pkg/filter/a2a/task_manager_test.go | 226 ++++++++++
pkg/filter/a2a/types.go | 202 +++++++++
pkg/pluginregistry/registry.go | 1 +
10 files changed, 2192 insertions(+)
diff --git a/pkg/common/constant/key.go b/pkg/common/constant/key.go
index 0400250c..09fa037e 100644
--- a/pkg/common/constant/key.go
+++ b/pkg/common/constant/key.go
@@ -61,6 +61,9 @@ const (
LLMTokenizerFilter = "dgp.filter.llm.tokenizer"
MCPServerFilter = "dgp.filter.mcp.mcpserver"
+
+ // A2A Filters
+ A2AServerFilter = "dgp.filter.a2a.server"
)
const (
diff --git a/pkg/filter/a2a/config.go b/pkg/filter/a2a/config.go
new file mode 100644
index 00000000..d01f3390
--- /dev/null
+++ b/pkg/filter/a2a/config.go
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+// Config represents the configuration for the A2A Server Filter
+type Config struct {
+ // Endpoint specifies the A2A service endpoint path
+ Endpoint string `yaml:"endpoint" json:"endpoint" default:"/a2a"`
+
+ // AgentInfo contains information about the current agent
+ AgentInfo *AgentConfig `yaml:"agent_info" json:"agent_info"`
+
+ // KnownAgents contains a list of statically configured known agents
+ KnownAgents []AgentConfig `yaml:"known_agents"
json:"known_agents,omitempty"`
+
+ // TaskConfig contains task management related configuration
+ TaskConfig *TaskConfig `yaml:"task_config" json:"task_config,omitempty"`
+}
+
+// AgentConfig represents configuration for an agent
+type AgentConfig struct {
+ // AgentID is the unique identifier for the agent
+ AgentID string `yaml:"agent_id" json:"agent_id"`
+
+ // Name is the human-readable name of the agent
+ Name string `yaml:"name" json:"name"`
+
+ // Version is the version of the agent
+ Version string `yaml:"version" json:"version" default:"1.0.0"`
+
+ // Description is an optional description of the agent
+ Description string `yaml:"description" json:"description,omitempty"`
+
+ // Endpoint is the HTTP endpoint where the agent can be reached
+ Endpoint string `yaml:"endpoint" json:"endpoint"`
+
+ // Status is the current status of the agent
+ Status AgentStatus `yaml:"status" json:"status" default:"online"`
+
+ // Capabilities lists the capabilities that the agent provides
+ Capabilities []CapabilityConfig `yaml:"capabilities"
json:"capabilities,omitempty"`
+
+ // Metadata contains additional metadata about the agent
+ Metadata map[string]any `yaml:"metadata" json:"metadata,omitempty"`
+}
+
+// CapabilityConfig represents configuration for an agent capability
+type CapabilityConfig struct {
+ // Name is the unique name of the capability
+ Name string `yaml:"name" json:"name"`
+
+ // Description is an optional description of the capability
+ Description string `yaml:"description" json:"description,omitempty"`
+
+ // InputTypes specifies the types of input this capability accepts
+ InputTypes []string `yaml:"input_types" json:"input_types,omitempty"`
+
+ // OutputTypes specifies the types of output this capability produces
+ OutputTypes []string `yaml:"output_types" json:"output_types,omitempty"`
+
+ // Tags are labels that can be used to categorize and discover
capabilities
+ Tags []string `yaml:"tags" json:"tags,omitempty"`
+
+ // Parameters defines the parameters that this capability accepts
+ Parameters []ParameterConfig `yaml:"parameters"
json:"parameters,omitempty"`
+}
+
+// ParameterConfig represents configuration for a capability parameter
+type ParameterConfig struct {
+ // Name is the parameter name
+ Name string `yaml:"name" json:"name"`
+
+ // Type is the parameter type (string, number, boolean, object, array)
+ Type string `yaml:"type" json:"type" default:"string"`
+
+ // Description is an optional description of the parameter
+ Description string `yaml:"description" json:"description,omitempty"`
+
+ // Required indicates whether this parameter is required
+ Required bool `yaml:"required" json:"required" default:"false"`
+
+ // Default is the default value for the parameter
+ Default any `yaml:"default" json:"default,omitempty"`
+
+ // Enum lists the allowed values for the parameter (if applicable)
+ Enum []any `yaml:"enum" json:"enum,omitempty"`
+}
+
+// TaskConfig represents configuration for task management
+type TaskConfig struct {
+ // DefaultTimeout is the default timeout for tasks in milliseconds
+ DefaultTimeout int64 `yaml:"default_timeout" json:"default_timeout"
default:"30000"`
+
+ // MaxConcurrentTasks is the maximum number of concurrent tasks
+ MaxConcurrentTasks int `yaml:"max_concurrent_tasks"
json:"max_concurrent_tasks" default:"100"`
+
+ // CleanupInterval is the interval for cleaning up completed/expired
tasks in milliseconds
+ CleanupInterval int64 `yaml:"cleanup_interval" json:"cleanup_interval"
default:"300000"`
+
+ // TaskRetention is how long to keep completed tasks in milliseconds
+ TaskRetention int64 `yaml:"task_retention" json:"task_retention"
default:"3600000"`
+}
+
+// GetDefaultAgentInfo returns a default agent configuration
+func (c *Config) GetDefaultAgentInfo() *AgentInfo {
+ if c.AgentInfo == nil {
+ return &AgentInfo{
+ AgentID: "default-a2a-agent",
+ Name: "Default A2A Agent",
+ Version: DefaultAgentVersion,
+ Description: "A default A2A agent",
+ Endpoint: c.Endpoint,
+ Status: StatusOnline,
+ Capabilities: []Capability{},
+ Metadata: make(map[string]any),
+ }
+ }
+
+ return c.AgentInfo.ToAgentInfo(c.Endpoint)
+}
+
+// ToAgentInfo converts AgentConfig to AgentInfo
+func (ac *AgentConfig) ToAgentInfo(defaultEndpoint string) *AgentInfo {
+ endpoint := ac.Endpoint
+ if endpoint == "" {
+ endpoint = defaultEndpoint
+ }
+
+ capabilities := make([]Capability, len(ac.Capabilities))
+ for i, cap := range ac.Capabilities {
+ capabilities[i] = cap.ToCapability()
+ }
+
+ return &AgentInfo{
+ AgentID: ac.AgentID,
+ Name: ac.Name,
+ Version: ac.Version,
+ Description: ac.Description,
+ Endpoint: endpoint,
+ Status: ac.Status,
+ Capabilities: capabilities,
+ Metadata: ac.Metadata,
+ }
+}
+
+// ToCapability converts CapabilityConfig to Capability
+func (cc *CapabilityConfig) ToCapability() Capability {
+ parameters := make([]Parameter, len(cc.Parameters))
+ for i, param := range cc.Parameters {
+ parameters[i] = Parameter{
+ Name: param.Name,
+ Type: param.Type,
+ Description: param.Description,
+ Required: param.Required,
+ Default: param.Default,
+ }
+ }
+
+ return Capability{
+ Name: cc.Name,
+ Description: cc.Description,
+ InputTypes: cc.InputTypes,
+ OutputTypes: cc.OutputTypes,
+ Tags: cc.Tags,
+ Parameters: parameters,
+ }
+}
+
+// GetTaskConfig returns the task configuration or default values
+func (c *Config) GetTaskConfig() *TaskConfig {
+ if c.TaskConfig == nil {
+ return &TaskConfig{
+ DefaultTimeout: DefaultTaskTimeout,
+ MaxConcurrentTasks: DefaultMaxConcurrentTasks,
+ CleanupInterval: DefaultTaskCleanupInterval,
+ TaskRetention: DefaultTaskCleanupInterval * 12, //
1 hour default
+ }
+ }
+ return c.TaskConfig
+}
diff --git a/pkg/filter/a2a/constants.go b/pkg/filter/a2a/constants.go
new file mode 100644
index 00000000..8a73b83b
--- /dev/null
+++ b/pkg/filter/a2a/constants.go
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+)
+
+const (
+ // Kind is the type identifier for A2A Server Filter
+ Kind = constant.A2AServerFilter
+
+ // DefaultEndpoint is the default A2A service endpoint
+ DefaultEndpoint = "/a2a"
+
+ // DefaultAgentVersion is the default agent version
+ DefaultAgentVersion = "1.0.0"
+
+ // JSON-RPC version constant
+ JSONRPCVersion = "2.0"
+
+ // A2A Protocol Methods
+
+ // Core methods
+ MethodPing = "a2a.ping"
+ MethodGetAgentCard = "a2a.get_agent_card"
+ MethodDiscoverAgents = "a2a.discover_agents"
+
+ // Task management methods
+ MethodCreateTask = "a2a.create_task"
+ MethodGetTaskStatus = "a2a.get_task_status"
+ MethodUpdateTask = "a2a.update_task"
+ MethodCancelTask = "a2a.cancel_task"
+
+ // Communication methods
+ MethodSendMessage = "a2a.send_message"
+ MethodBroadcastMessage = "a2a.broadcast_message"
+
+ // Agent status constants
+ AgentStatusOnline = "online"
+ AgentStatusOffline = "offline"
+ AgentStatusBusy = "busy"
+
+ // Task status constants
+ TaskStatusPending = "pending"
+ TaskStatusRunning = "running"
+ TaskStatusCompleted = "completed"
+ TaskStatusFailed = "failed"
+ TaskStatusCancelled = "canceled"
+
+ // Error codes for JSON-RPC errors
+ ErrorCodeParseError = -32700
+ ErrorCodeInvalidRequest = -32600
+ ErrorCodeMethodNotFound = -32601
+ ErrorCodeInvalidParams = -32602
+ ErrorCodeInternalError = -32603
+
+ // A2A specific error codes
+ ErrorCodeAgentNotFound = -32001
+ ErrorCodeTaskNotFound = -32002
+ ErrorCodeTaskTimeout = -32003
+
+ // Default configuration values
+ DefaultTaskTimeout = 30000 // 30 seconds in milliseconds
+ DefaultMaxConcurrentTasks = 100
+ DefaultTaskCleanupInterval = 300000 // 5 minutes in milliseconds
+)
diff --git a/pkg/filter/a2a/filter.go b/pkg/filter/a2a/filter.go
new file mode 100644
index 00000000..e737de44
--- /dev/null
+++ b/pkg/filter/a2a/filter.go
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+ contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// FilterFactory is the factory for creating A2A filter instances
+type FilterFactory struct {
+ cfg *Config
+ agentMap map[string]*AgentInfo
+ taskManager *TaskManager
+ mutex sync.RWMutex
+}
+
+// Config returns the configuration struct for the factory
+func (f *FilterFactory) Config() any {
+ return f.cfg
+}
+
+// Apply initializes the filter factory from its configuration
+func (f *FilterFactory) Apply() error {
+ logger.Infof("[dubbo-go-pixiu] A2A Server Filter factory applying
configuration")
+
+ // Initialize agent map
+ f.agentMap = make(map[string]*AgentInfo)
+
+ // Load current agent info
+ agentInfo := f.cfg.GetDefaultAgentInfo()
+ f.agentMap[agentInfo.AgentID] = agentInfo
+
+ // Load known agents from configuration
+ for _, agentConfig := range f.cfg.KnownAgents {
+ agent := agentConfig.ToAgentInfo(f.cfg.Endpoint)
+ f.agentMap[agent.AgentID] = agent
+ logger.Infof("[dubbo-go-pixiu] A2A Server loaded known agent:
%s (%s)", agent.Name, agent.AgentID)
+ }
+
+ // Initialize task manager
+ taskConfig := f.cfg.GetTaskConfig()
+ f.taskManager = NewTaskManager(taskConfig)
+
+ logger.Infof("[dubbo-go-pixiu] A2A Server Filter factory initialized
with %d agents", len(f.agentMap))
+ return nil
+}
+
+// PrepareFilterChain prepares the filter chain for a new request
+func (f *FilterFactory) PrepareFilterChain(_ *contexthttp.HttpContext, chain
filter.FilterChain) error {
+ a2aFilter := &A2AFilter{
+ cfg: f.cfg,
+ agentMap: f.agentMap,
+ taskManager: f.taskManager,
+ mutex: &f.mutex,
+ }
+
+ chain.AppendDecodeFilters(a2aFilter)
+ return nil
+}
+
+// A2AFilter is the actual filter that processes A2A requests
+type A2AFilter struct {
+ cfg *Config
+ agentMap map[string]*AgentInfo
+ taskManager *TaskManager
+ mutex *sync.RWMutex
+}
+
+// Decode processes incoming HTTP requests for A2A protocol
+func (f *A2AFilter) Decode(ctx *contexthttp.HttpContext) filter.FilterStatus {
+ // Check if this is an A2A request
+ if !f.isA2ARequest(ctx) {
+ return filter.Continue
+ }
+
+ logger.Infof("[dubbo-go-pixiu] A2A Server processing request: %s %s",
ctx.Request.Method, ctx.Request.URL.Path)
+
+ // Read request body
+ body, err := io.ReadAll(ctx.Request.Body)
+ if err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to read
request body: %v", err)
+ f.sendErrorResponse(ctx, nil, ErrorCodeInternalError, "Failed
to read request body")
+ return filter.Stop
+ }
+
+ // Parse JSON-RPC request
+ var jsonrpcReq JSONRPCRequest
+ if err := json.Unmarshal(body, &jsonrpcReq); err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse
JSON-RPC request: %v", err)
+ f.sendErrorResponse(ctx, nil, ErrorCodeParseError, "Invalid
JSON-RPC request")
+ return filter.Stop
+ }
+
+ // Validate JSON-RPC version
+ if jsonrpcReq.JSONRPC != JSONRPCVersion {
+ logger.Warnf("[dubbo-go-pixiu] A2A Server invalid JSON-RPC
version: %s", jsonrpcReq.JSONRPC)
+ f.sendErrorResponse(ctx, jsonrpcReq.ID,
ErrorCodeInvalidRequest, "Invalid JSON-RPC version")
+ return filter.Stop
+ }
+
+ logger.Infof("[dubbo-go-pixiu] A2A Server received method: %s (id:
%v)", jsonrpcReq.Method, jsonrpcReq.ID)
+
+ // Process the A2A method
+ response := f.processA2AMethod(&jsonrpcReq)
+
+ // Send response
+ f.sendJSONResponse(ctx, response)
+ return filter.Stop
+}
+
+// isA2ARequest checks if the incoming request is an A2A request
+func (f *A2AFilter) isA2ARequest(ctx *contexthttp.HttpContext) bool {
+ // Check if the request path matches the A2A endpoint
+ if !strings.HasPrefix(ctx.Request.URL.Path, f.cfg.Endpoint) {
+ return false
+ }
+
+ // Check if it's a POST request with JSON content
+ if ctx.Request.Method != "POST" {
+ return false
+ }
+
+ contentType := ctx.Request.Header.Get("Content-Type")
+ return strings.Contains(contentType, "application/json")
+}
+
+// processA2AMethod processes different A2A protocol methods
+func (f *A2AFilter) processA2AMethod(req *JSONRPCRequest) *JSONRPCResponse {
+ switch req.Method {
+ case MethodPing:
+ return f.handlePing(req)
+ case MethodGetAgentCard:
+ return f.handleGetAgentCard(req)
+ case MethodDiscoverAgents:
+ return f.handleDiscoverAgents(req)
+ case MethodCreateTask:
+ return f.handleCreateTask(req)
+ case MethodGetTaskStatus:
+ return f.handleGetTaskStatus(req)
+ case MethodUpdateTask:
+ return f.handleUpdateTask(req)
+ case MethodCancelTask:
+ return f.handleCancelTask(req)
+ case MethodSendMessage:
+ return f.handleSendMessage(req)
+ case MethodBroadcastMessage:
+ return f.handleBroadcastMessage(req)
+ default:
+ logger.Warnf("[dubbo-go-pixiu] A2A Server unsupported method:
%s", req.Method)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeMethodNotFound,
+ Message: "Method not found",
+ },
+ }
+ }
+}
+
+// Basic method handlers (placeholder implementations for phase 1)
+
+// handlePing handles ping requests
+func (f *A2AFilter) handlePing(req *JSONRPCRequest) *JSONRPCResponse {
+ f.mutex.RLock()
+ agentInfo := f.cfg.GetDefaultAgentInfo()
+ f.mutex.RUnlock()
+
+ response := PingResponse{
+ Status: "pong",
+ AgentID: agentInfo.AgentID,
+ Timestamp: time.Now().UnixMilli(),
+ }
+
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Result: response,
+ }
+}
+
+// handleGetAgentCard handles get agent card requests
+func (f *A2AFilter) handleGetAgentCard(req *JSONRPCRequest) *JSONRPCResponse {
+ f.mutex.RLock()
+ agentInfo := f.cfg.GetDefaultAgentInfo()
+ f.mutex.RUnlock()
+
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Result: agentInfo,
+ }
+}
+
+// handleDiscoverAgents handles agent discovery requests
+func (f *A2AFilter) handleDiscoverAgents(req *JSONRPCRequest) *JSONRPCResponse
{
+ f.mutex.RLock()
+ defer f.mutex.RUnlock()
+
+ // For now, return all known agents
+ var agents []AgentInfo
+ for _, agent := range f.agentMap {
+ agents = append(agents, *agent)
+ }
+
+ response := DiscoverAgentsResponse{
+ Agents: agents,
+ }
+
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Result: response,
+ }
+}
+
+// Placeholder implementations for other methods (to be implemented in later
phases)
+
+func (f *A2AFilter) handleCreateTask(req *JSONRPCRequest) *JSONRPCResponse {
+ // Parse request parameters
+ paramsBytes, err := json.Marshal(req.Params)
+ if err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal
create task params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid parameters",
+ },
+ }
+ }
+
+ var params CreateTaskRequest
+ if err := json.Unmarshal(paramsBytes, ¶ms); err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse
create task params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid task parameters",
+ },
+ }
+ }
+
+ // Validate required fields
+ if params.To == "" || params.Type == "" || params.Content == nil {
+ logger.Warnf("[dubbo-go-pixiu] A2A Server create task missing
required fields")
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Missing required fields: to, type,
content",
+ },
+ }
+ }
+
+ // Get current agent info as the sender
+ f.mutex.RLock()
+ agentInfo := f.cfg.GetDefaultAgentInfo()
+ f.mutex.RUnlock()
+
+ // Create task
+ task, err := f.taskManager.CreateTask(agentInfo.AgentID, params.To,
params.Type, params.Content, params.Timeout)
+ if err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to create
task: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInternalError,
+ Message: fmt.Sprintf("Failed to create task:
%v", err),
+ },
+ }
+ }
+
+ logger.Infof("[dubbo-go-pixiu] A2A Server created task %s: %s -> %s",
task.TaskID, task.From, task.To)
+
+ // Build response
+ response := CreateTaskResponse{
+ TaskID: task.TaskID,
+ Status: task.Status,
+ }
+
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Result: response,
+ }
+}
+
+func (f *A2AFilter) handleGetTaskStatus(req *JSONRPCRequest) *JSONRPCResponse {
+ // Parse request parameters
+ paramsBytes, err := json.Marshal(req.Params)
+ if err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal
get task status params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid parameters",
+ },
+ }
+ }
+
+ var params GetTaskStatusRequest
+ if err := json.Unmarshal(paramsBytes, ¶ms); err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse get
task status params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid task status parameters",
+ },
+ }
+ }
+
+ // Validate task ID
+ if params.TaskID == "" {
+ logger.Warnf("[dubbo-go-pixiu] A2A Server get task status
missing task_id")
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Missing required field: task_id",
+ },
+ }
+ }
+
+ // Get task from task manager
+ task, err := f.taskManager.GetTask(params.TaskID)
+ if err != nil {
+ logger.Warnf("[dubbo-go-pixiu] A2A Server task not found: %s",
params.TaskID)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeTaskNotFound,
+ Message: fmt.Sprintf("Task not found: %s",
params.TaskID),
+ },
+ }
+ }
+
+ logger.Infof("[dubbo-go-pixiu] A2A Server retrieved task status: %s
(status: %s)", params.TaskID, task.Status)
+
+ // Build response
+ response := GetTaskStatusResponse{
+ Task: *task,
+ }
+
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Result: response,
+ }
+}
+
+func (f *A2AFilter) handleUpdateTask(req *JSONRPCRequest) *JSONRPCResponse {
+ // Parse request parameters
+ paramsBytes, err := json.Marshal(req.Params)
+ if err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal
update task params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid parameters",
+ },
+ }
+ }
+
+ var params UpdateTaskRequest
+ if err := json.Unmarshal(paramsBytes, ¶ms); err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse
update task params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid update task parameters",
+ },
+ }
+ }
+
+ // Validate task ID
+ if params.TaskID == "" {
+ logger.Warnf("[dubbo-go-pixiu] A2A Server update task missing
task_id")
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Missing required field: task_id",
+ },
+ }
+ }
+
+ // Update task
+ err = f.taskManager.UpdateTask(params.TaskID, params.Status,
params.Result, params.Error)
+ if err != nil {
+ logger.Warnf("[dubbo-go-pixiu] A2A Server failed to update task
%s: %v", params.TaskID, err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeTaskNotFound,
+ Message: fmt.Sprintf("Failed to update task:
%v", err),
+ },
+ }
+ }
+
+ logger.Infof("[dubbo-go-pixiu] A2A Server updated task %s to status
%s", params.TaskID, params.Status)
+
+ // Build response
+ response := UpdateTaskResponse{
+ Success: true,
+ Message: "Task updated successfully",
+ }
+
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Result: response,
+ }
+}
+
+func (f *A2AFilter) handleCancelTask(req *JSONRPCRequest) *JSONRPCResponse {
+ // Parse request parameters
+ paramsBytes, err := json.Marshal(req.Params)
+ if err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal
cancel task params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid parameters",
+ },
+ }
+ }
+
+ var params struct {
+ TaskID string `json:"task_id"`
+ }
+ if err := json.Unmarshal(paramsBytes, ¶ms); err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse
cancel task params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid cancel task parameters",
+ },
+ }
+ }
+
+ // Validate task ID
+ if params.TaskID == "" {
+ logger.Warnf("[dubbo-go-pixiu] A2A Server cancel task missing
task_id")
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Missing required field: task_id",
+ },
+ }
+ }
+
+ // Cancel task
+ err = f.taskManager.CancelTask(params.TaskID)
+ if err != nil {
+ logger.Warnf("[dubbo-go-pixiu] A2A Server failed to cancel task
%s: %v", params.TaskID, err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeTaskNotFound,
+ Message: fmt.Sprintf("Failed to cancel task:
%v", err),
+ },
+ }
+ }
+
+ logger.Infof("[dubbo-go-pixiu] A2A Server canceled task %s",
params.TaskID)
+
+ // Build response
+ response := UpdateTaskResponse{
+ Success: true,
+ Message: "Task canceled successfully",
+ }
+
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Result: response,
+ }
+}
+
+func (f *A2AFilter) handleSendMessage(req *JSONRPCRequest) *JSONRPCResponse {
+ // Parse request parameters
+ paramsBytes, err := json.Marshal(req.Params)
+ if err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal
send message params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid parameters",
+ },
+ }
+ }
+
+ var params SendMessageRequest
+ if err := json.Unmarshal(paramsBytes, ¶ms); err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse send
message params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid send message parameters",
+ },
+ }
+ }
+
+ // Validate required fields
+ if params.To == "" || params.Type == "" || params.Content == nil {
+ logger.Warnf("[dubbo-go-pixiu] A2A Server send message missing
required fields")
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Missing required fields: to, type,
content",
+ },
+ }
+ }
+
+ // Find target agent
+ f.mutex.RLock()
+ targetAgent, exists := f.agentMap[params.To]
+ agentInfo := f.cfg.GetDefaultAgentInfo()
+ f.mutex.RUnlock()
+
+ if !exists {
+ logger.Warnf("[dubbo-go-pixiu] A2A Server target agent not
found: %s", params.To)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeAgentNotFound,
+ Message: fmt.Sprintf("Agent not found: %s",
params.To),
+ },
+ }
+ }
+
+ // Generate message ID
+ messageID := fmt.Sprintf("msg_%d_%d", time.Now().UnixMilli(),
time.Now().UnixNano()%1000)
+
+ // Build message
+ message := &Message{
+ MessageID: messageID,
+ From: agentInfo.AgentID,
+ To: params.To,
+ Type: params.Type,
+ Content: params.Content,
+ Timestamp: time.Now().UnixMilli(),
+ }
+
+ // Forward message to agent
+ err = f.forwardMessageToAgent(targetAgent, message)
+ if err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to forward
message to %s: %v", params.To, err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInternalError,
+ Message: fmt.Sprintf("Failed to send message:
%v", err),
+ },
+ }
+ }
+
+ logger.Infof("[dubbo-go-pixiu] A2A Server sent message %s to agent %s",
messageID, params.To)
+
+ // Build response
+ response := SendMessageResponse{
+ MessageID: messageID,
+ Success: true,
+ Message: "Message sent successfully",
+ }
+
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Result: response,
+ }
+}
+
+func (f *A2AFilter) handleBroadcastMessage(req *JSONRPCRequest)
*JSONRPCResponse {
+ // Parse request parameters
+ paramsBytes, err := json.Marshal(req.Params)
+ if err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal
broadcast message params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid parameters",
+ },
+ }
+ }
+
+ var params SendMessageRequest
+ if err := json.Unmarshal(paramsBytes, ¶ms); err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to parse
broadcast message params: %v", err)
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Invalid broadcast message parameters",
+ },
+ }
+ }
+
+ // Validate required fields
+ if params.Type == "" || params.Content == nil {
+ logger.Warnf("[dubbo-go-pixiu] A2A Server broadcast message
missing required fields")
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Error: &RPCError{
+ Code: ErrorCodeInvalidParams,
+ Message: "Missing required fields: type,
content",
+ },
+ }
+ }
+
+ // Get current agent info
+ f.mutex.RLock()
+ agentInfo := f.cfg.GetDefaultAgentInfo()
+
+ // Collect target agents (online agents excluding self)
+ var targetAgents []*AgentInfo
+ for _, agent := range f.agentMap {
+ if agent.AgentID != agentInfo.AgentID && agent.Status ==
StatusOnline {
+ targetAgents = append(targetAgents, agent)
+ }
+ }
+ f.mutex.RUnlock()
+
+ totalAgents := len(targetAgents)
+ if totalAgents == 0 {
+ logger.Infof("[dubbo-go-pixiu] A2A Server no online agents to
broadcast to")
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Result: BroadcastMessageResponse{
+ TotalAgents: 0,
+ SuccessCount: 0,
+ MessageIDs: []string{},
+ },
+ }
+ }
+
+ // Use channel to collect results
+ type result struct {
+ agentID string
+ messageID string
+ err error
+ }
+ resultCh := make(chan result, totalAgents)
+
+ // Broadcast to all target agents concurrently
+ for _, agent := range targetAgents {
+ go func(targetAgent *AgentInfo) {
+ // Generate unique message ID
+ messageID := fmt.Sprintf("msg_%d_%s_%d",
time.Now().UnixMilli(), targetAgent.AgentID, time.Now().UnixNano()%1000)
+
+ // Build message
+ message := &Message{
+ MessageID: messageID,
+ From: agentInfo.AgentID,
+ To: targetAgent.AgentID,
+ Type: params.Type,
+ Content: params.Content,
+ Timestamp: time.Now().UnixMilli(),
+ }
+
+ // Forward message
+ err := f.forwardMessageToAgent(targetAgent, message)
+ resultCh <- result{
+ agentID: targetAgent.AgentID,
+ messageID: messageID,
+ err: err,
+ }
+ }(agent)
+ }
+
+ // Collect results
+ var successCount int
+ var failedAgents []string
+ var messageIDs []string
+
+ for i := 0; i < totalAgents; i++ {
+ res := <-resultCh
+ if res.err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to
broadcast to agent %s: %v", res.agentID, res.err)
+ failedAgents = append(failedAgents, res.agentID)
+ } else {
+ successCount++
+ messageIDs = append(messageIDs, res.messageID)
+ }
+ }
+ close(resultCh)
+
+ logger.Infof("[dubbo-go-pixiu] A2A Server broadcast message to %d
agents: %d succeeded, %d failed",
+ totalAgents, successCount, len(failedAgents))
+
+ // Build response
+ response := BroadcastMessageResponse{
+ TotalAgents: totalAgents,
+ SuccessCount: successCount,
+ FailedAgents: failedAgents,
+ MessageIDs: messageIDs,
+ }
+
+ return &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: req.ID,
+ Result: response,
+ }
+}
+
+// Utility methods
+
+// sendErrorResponse sends an error response to the client
+func (f *A2AFilter) sendErrorResponse(ctx *contexthttp.HttpContext, id any,
code int, message string) {
+ response := &JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: id,
+ Error: &RPCError{
+ Code: code,
+ Message: message,
+ },
+ }
+ f.sendJSONResponse(ctx, response)
+}
+
+// sendJSONResponse sends a JSON response to the client
+func (f *A2AFilter) sendJSONResponse(ctx *contexthttp.HttpContext, response
*JSONRPCResponse) {
+ responseBody, err := json.Marshal(response)
+ if err != nil {
+ logger.Errorf("[dubbo-go-pixiu] A2A Server failed to marshal
response: %v", err)
+ ctx.SendLocalReply(500, []byte("Internal Server Error"))
+ return
+ }
+
+ // SendLocalReply automatically sets Content-Type to application/json
for valid JSON
+ ctx.SendLocalReply(200, responseBody)
+}
+
+// forwardMessageToAgent forwards a message to a specific agent via HTTP
+func (f *A2AFilter) forwardMessageToAgent(agent *AgentInfo, message *Message)
error {
+ // Build JSON-RPC request for receiving message
+ jsonrpcReq := JSONRPCRequest{
+ JSONRPC: JSONRPCVersion,
+ Method: "a2a.receive_message",
+ Params: message,
+ ID: fmt.Sprintf("msg_%d", time.Now().UnixMilli()),
+ }
+
+ // Marshal request body
+ reqBody, err := json.Marshal(jsonrpcReq)
+ if err != nil {
+ return fmt.Errorf("failed to marshal message request: %v", err)
+ }
+
+ // Create HTTP request
+ httpReq, err := http.NewRequest("POST", agent.Endpoint,
bytes.NewReader(reqBody))
+ if err != nil {
+ return fmt.Errorf("failed to create HTTP request: %v", err)
+ }
+
+ // Set headers
+ httpReq.Header.Set("Content-Type", "application/json")
+
+ // Create HTTP client with timeout
+ httpClient := &http.Client{
+ Timeout: 30 * time.Second,
+ }
+
+ // Send request
+ resp, err := httpClient.Do(httpReq)
+ if err != nil {
+ return fmt.Errorf("failed to send message to agent %s: %v",
agent.AgentID, err)
+ }
+ defer resp.Body.Close()
+
+ // Check response status
+ if resp.StatusCode != 200 {
+ bodyBytes, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("agent %s returned status %d: %s",
agent.AgentID, resp.StatusCode, string(bodyBytes))
+ }
+
+ // Parse response
+ var jsonrpcResp JSONRPCResponse
+ if err := json.NewDecoder(resp.Body).Decode(&jsonrpcResp); err != nil {
+ return fmt.Errorf("failed to parse response from agent %s: %v",
agent.AgentID, err)
+ }
+
+ // Check for JSON-RPC error
+ if jsonrpcResp.Error != nil {
+ return fmt.Errorf("agent %s returned error: %s", agent.AgentID,
jsonrpcResp.Error.Message)
+ }
+
+ logger.Infof("[dubbo-go-pixiu] A2A Server successfully forwarded
message %s to agent %s", message.MessageID, agent.AgentID)
+ return nil
+}
diff --git a/pkg/filter/a2a/filter_test.go b/pkg/filter/a2a/filter_test.go
new file mode 100644
index 00000000..38f67a6c
--- /dev/null
+++ b/pkg/filter/a2a/filter_test.go
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+ "bytes"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+ "github.com/apache/dubbo-go-pixiu/pkg/context/mock"
+)
+
+// createTestConfig creates a test configuration
+func createTestConfig() *Config {
+ return &Config{
+ Endpoint: "/a2a",
+ AgentInfo: &AgentConfig{
+ AgentID: "test-agent",
+ Name: "Test Agent",
+ Version: "1.0.0",
+ Description: "Test agent for unit tests",
+ Endpoint: "http://localhost:8888/a2a",
+ Status: StatusOnline,
+ Capabilities: []CapabilityConfig{
+ {
+ Name: "test_capability",
+ Description: "Test capability",
+ Tags: []string{"test"},
+ },
+ },
+ },
+ KnownAgents: []AgentConfig{
+ {
+ AgentID: "agent-1",
+ Name: "Agent 1",
+ Version: "1.0.0",
+ Endpoint: "http://localhost:9001/a2a",
+ Status: StatusOnline,
+ },
+ },
+ TaskConfig: &TaskConfig{
+ DefaultTimeout: 5000,
+ MaxConcurrentTasks: 10,
+ CleanupInterval: 1000,
+ TaskRetention: 5000,
+ },
+ }
+}
+
+// createTestFilterFactory creates a filter factory for testing
+func createTestFilterFactory() *FilterFactory {
+ factory := &FilterFactory{
+ cfg: createTestConfig(),
+ }
+ factory.Apply()
+ return factory
+}
+
+// buildJSONRPCRequest builds a JSON-RPC request body
+func buildJSONRPCRequest(method string, params any) []byte {
+ req := JSONRPCRequest{
+ JSONRPC: JSONRPCVersion,
+ Method: method,
+ Params: params,
+ ID: 1,
+ }
+ body, _ := json.Marshal(req)
+ return body
+}
+
+// TestFilterFactory_Apply tests filter factory initialization
+func TestFilterFactory_Apply(t *testing.T) {
+ factory := createTestFilterFactory()
+
+ assert.NotNil(t, factory.agentMap)
+ assert.NotNil(t, factory.taskManager)
+ assert.Equal(t, 2, len(factory.agentMap)) // test-agent + agent-1
+}
+
+// TestA2AFilter_HandlePing tests the ping method
+func TestA2AFilter_HandlePing(t *testing.T) {
+ factory := createTestFilterFactory()
+
+ reqBody := buildJSONRPCRequest(MethodPing, nil)
+ request, err := http.NewRequest("POST", "http://localhost:8888/a2a",
bytes.NewReader(reqBody))
+ assert.NoError(t, err)
+ request.Header.Set("Content-Type", "application/json")
+
+ ctx := mock.GetMockHTTPContext(request)
+ f := &A2AFilter{
+ cfg: factory.cfg,
+ agentMap: factory.agentMap,
+ taskManager: factory.taskManager,
+ mutex: &factory.mutex,
+ }
+
+ status := f.Decode(ctx)
+ assert.Equal(t, filter.Stop, status)
+
+ // Verify response was written (we can't easily check the actual
response in this test setup)
+ // In a real scenario, you'd inspect ctx.Writer or use
httptest.ResponseRecorder
+}
+
+// TestA2AFilter_HandleGetAgentCard tests the get_agent_card method
+func TestA2AFilter_HandleGetAgentCard(t *testing.T) {
+ factory := createTestFilterFactory()
+
+ reqBody := buildJSONRPCRequest(MethodGetAgentCard, nil)
+ request, err := http.NewRequest("POST", "http://localhost:8888/a2a",
bytes.NewReader(reqBody))
+ assert.NoError(t, err)
+ request.Header.Set("Content-Type", "application/json")
+
+ ctx := mock.GetMockHTTPContext(request)
+ f := &A2AFilter{
+ cfg: factory.cfg,
+ agentMap: factory.agentMap,
+ taskManager: factory.taskManager,
+ mutex: &factory.mutex,
+ }
+
+ status := f.Decode(ctx)
+ assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_HandleDiscoverAgents tests the discover_agents method
+func TestA2AFilter_HandleDiscoverAgents(t *testing.T) {
+ factory := createTestFilterFactory()
+
+ reqBody := buildJSONRPCRequest(MethodDiscoverAgents, nil)
+ request, err := http.NewRequest("POST", "http://localhost:8888/a2a",
bytes.NewReader(reqBody))
+ assert.NoError(t, err)
+ request.Header.Set("Content-Type", "application/json")
+
+ ctx := mock.GetMockHTTPContext(request)
+ f := &A2AFilter{
+ cfg: factory.cfg,
+ agentMap: factory.agentMap,
+ taskManager: factory.taskManager,
+ mutex: &factory.mutex,
+ }
+
+ status := f.Decode(ctx)
+ assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_HandleCreateTask tests task creation
+func TestA2AFilter_HandleCreateTask(t *testing.T) {
+ factory := createTestFilterFactory()
+
+ params := CreateTaskRequest{
+ To: "agent-1",
+ Type: "test_task",
+ Content: map[string]any{
+ "message": "test",
+ },
+ Timeout: 3000,
+ }
+
+ reqBody := buildJSONRPCRequest(MethodCreateTask, params)
+ request, err := http.NewRequest("POST", "http://localhost:8888/a2a",
bytes.NewReader(reqBody))
+ assert.NoError(t, err)
+ request.Header.Set("Content-Type", "application/json")
+
+ ctx := mock.GetMockHTTPContext(request)
+ f := &A2AFilter{
+ cfg: factory.cfg,
+ agentMap: factory.agentMap,
+ taskManager: factory.taskManager,
+ mutex: &factory.mutex,
+ }
+
+ status := f.Decode(ctx)
+ assert.Equal(t, filter.Stop, status)
+
+ // Verify task was created
+ assert.Equal(t, 1, factory.taskManager.GetTaskCount())
+}
+
+// TestA2AFilter_HandleCreateTask_MissingParams tests task creation with
missing parameters
+func TestA2AFilter_HandleCreateTask_MissingParams(t *testing.T) {
+ factory := createTestFilterFactory()
+
+ // Missing required fields
+ params := CreateTaskRequest{
+ To: "agent-1",
+ // Missing Type and Content
+ }
+
+ reqBody := buildJSONRPCRequest(MethodCreateTask, params)
+ request, err := http.NewRequest("POST", "http://localhost:8888/a2a",
bytes.NewReader(reqBody))
+ assert.NoError(t, err)
+ request.Header.Set("Content-Type", "application/json")
+
+ ctx := mock.GetMockHTTPContext(request)
+ f := &A2AFilter{
+ cfg: factory.cfg,
+ agentMap: factory.agentMap,
+ taskManager: factory.taskManager,
+ mutex: &factory.mutex,
+ }
+
+ status := f.Decode(ctx)
+ assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_HandleSendMessage tests sending a message with mock server
+func TestA2AFilter_HandleSendMessage(t *testing.T) {
+ // Create a mock agent server
+ mockServer := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ // Simulate agent receiving message
+ response := JSONRPCResponse{
+ JSONRPC: JSONRPCVersion,
+ ID: 1,
+ Result: map[string]string{"status": "received"},
+ }
+ json.NewEncoder(w).Encode(response)
+ }))
+ defer mockServer.Close()
+
+ // Create factory with mock server endpoint
+ factory := createTestFilterFactory()
+ factory.agentMap["agent-1"].Endpoint = mockServer.URL
+
+ params := SendMessageRequest{
+ To: "agent-1",
+ Type: "test_message",
+ Content: map[string]any{
+ "text": "Hello, agent!",
+ },
+ }
+
+ reqBody := buildJSONRPCRequest(MethodSendMessage, params)
+ request, err := http.NewRequest("POST", "http://localhost:8888/a2a",
bytes.NewReader(reqBody))
+ assert.NoError(t, err)
+ request.Header.Set("Content-Type", "application/json")
+
+ ctx := mock.GetMockHTTPContext(request)
+ f := &A2AFilter{
+ cfg: factory.cfg,
+ agentMap: factory.agentMap,
+ taskManager: factory.taskManager,
+ mutex: &factory.mutex,
+ }
+
+ status := f.Decode(ctx)
+ assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_HandleSendMessage_AgentNotFound tests sending message to
non-existent agent
+func TestA2AFilter_HandleSendMessage_AgentNotFound(t *testing.T) {
+ factory := createTestFilterFactory()
+
+ params := SendMessageRequest{
+ To: "non-existent-agent",
+ Type: "test_message",
+ Content: map[string]any{
+ "text": "Hello",
+ },
+ }
+
+ reqBody := buildJSONRPCRequest(MethodSendMessage, params)
+ request, err := http.NewRequest("POST", "http://localhost:8888/a2a",
bytes.NewReader(reqBody))
+ assert.NoError(t, err)
+ request.Header.Set("Content-Type", "application/json")
+
+ ctx := mock.GetMockHTTPContext(request)
+ f := &A2AFilter{
+ cfg: factory.cfg,
+ agentMap: factory.agentMap,
+ taskManager: factory.taskManager,
+ mutex: &factory.mutex,
+ }
+
+ status := f.Decode(ctx)
+ assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_NonA2ARequest tests that non-A2A requests are ignored
+func TestA2AFilter_NonA2ARequest(t *testing.T) {
+ factory := createTestFilterFactory()
+
+ request, err := http.NewRequest("GET",
"http://localhost:8888/other-path", nil)
+ assert.NoError(t, err)
+
+ ctx := mock.GetMockHTTPContext(request)
+ f := &A2AFilter{
+ cfg: factory.cfg,
+ agentMap: factory.agentMap,
+ taskManager: factory.taskManager,
+ mutex: &factory.mutex,
+ }
+
+ status := f.Decode(ctx)
+ assert.Equal(t, filter.Continue, status)
+}
+
+// TestA2AFilter_InvalidJSON tests handling of invalid JSON
+func TestA2AFilter_InvalidJSON(t *testing.T) {
+ factory := createTestFilterFactory()
+
+ request, err := http.NewRequest("POST", "http://localhost:8888/a2a",
bytes.NewReader([]byte("invalid json")))
+ assert.NoError(t, err)
+ request.Header.Set("Content-Type", "application/json")
+
+ ctx := mock.GetMockHTTPContext(request)
+ f := &A2AFilter{
+ cfg: factory.cfg,
+ agentMap: factory.agentMap,
+ taskManager: factory.taskManager,
+ mutex: &factory.mutex,
+ }
+
+ status := f.Decode(ctx)
+ assert.Equal(t, filter.Stop, status)
+}
+
+// TestA2AFilter_UnknownMethod tests handling of unknown methods
+func TestA2AFilter_UnknownMethod(t *testing.T) {
+ factory := createTestFilterFactory()
+
+ reqBody := buildJSONRPCRequest("a2a.unknown_method", nil)
+ request, err := http.NewRequest("POST", "http://localhost:8888/a2a",
bytes.NewReader(reqBody))
+ assert.NoError(t, err)
+ request.Header.Set("Content-Type", "application/json")
+
+ ctx := mock.GetMockHTTPContext(request)
+ f := &A2AFilter{
+ cfg: factory.cfg,
+ agentMap: factory.agentMap,
+ taskManager: factory.taskManager,
+ mutex: &factory.mutex,
+ }
+
+ status := f.Decode(ctx)
+ assert.Equal(t, filter.Stop, status)
+}
diff --git a/pkg/filter/a2a/plugin.go b/pkg/filter/a2a/plugin.go
new file mode 100644
index 00000000..651df514
--- /dev/null
+++ b/pkg/filter/a2a/plugin.go
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+)
+
+func init() {
+ filter.RegisterHttpFilter(&Plugin{})
+}
+
+// Plugin implements filter.HttpFilterPlugin interface for A2A Server Filter
+type Plugin struct{}
+
+// Kind returns the plugin type identifier
+func (p *Plugin) Kind() string {
+ return Kind
+}
+
+// CreateFilterFactory creates a new FilterFactory instance
+func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) {
+ return &FilterFactory{cfg: &Config{}}, nil
+}
diff --git a/pkg/filter/a2a/task_manager.go b/pkg/filter/a2a/task_manager.go
new file mode 100644
index 00000000..f6be133e
--- /dev/null
+++ b/pkg/filter/a2a/task_manager.go
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// TaskManager manages tasks in memory (simplified implementation for phase 1)
+type TaskManager struct {
+ config *TaskConfig
+ tasks map[string]*Task
+ mutex sync.RWMutex
+ idCounter int64
+ stopCh chan struct{}
+ cleanupTicker *time.Ticker
+}
+
+// NewTaskManager creates a new task manager instance
+func NewTaskManager(config *TaskConfig) *TaskManager {
+ tm := &TaskManager{
+ config: config,
+ tasks: make(map[string]*Task),
+ idCounter: 1,
+ stopCh: make(chan struct{}),
+ }
+
+ // Start cleanup routine if cleanup interval is configured
+ if config.CleanupInterval > 0 {
+ tm.cleanupTicker =
time.NewTicker(time.Duration(config.CleanupInterval) * time.Millisecond)
+ go tm.cleanupRoutine()
+ logger.Infof("[dubbo-go-pixiu] A2A TaskManager cleanup routine
started with interval %d ms", config.CleanupInterval)
+ }
+
+ return tm
+}
+
+// CreateTask creates a new task
+func (tm *TaskManager) CreateTask(from, to, taskType string, content
map[string]any, timeout int64) (*Task, error) {
+ tm.mutex.Lock()
+ defer tm.mutex.Unlock()
+
+ // Check concurrent task limit
+ if len(tm.tasks) >= tm.config.MaxConcurrentTasks {
+ return nil, fmt.Errorf("maximum concurrent tasks limit reached:
%d", tm.config.MaxConcurrentTasks)
+ }
+
+ // Generate task ID
+ taskID := fmt.Sprintf("task_%d_%d", time.Now().UnixMilli(),
tm.idCounter)
+ tm.idCounter++
+
+ // Use default timeout if not specified
+ if timeout <= 0 {
+ timeout = tm.config.DefaultTimeout
+ }
+
+ now := time.Now().UnixMilli()
+ task := &Task{
+ TaskID: taskID,
+ From: from,
+ To: to,
+ Type: taskType,
+ Content: content,
+ Status: TaskPending,
+ CreatedAt: now,
+ UpdatedAt: now,
+ Timeout: timeout,
+ }
+
+ tm.tasks[taskID] = task
+ logger.Infof("[dubbo-go-pixiu] A2A TaskManager created task %s: %s ->
%s (%s)", taskID, from, to, taskType)
+
+ return task, nil
+}
+
+// GetTask retrieves a task by ID
+func (tm *TaskManager) GetTask(taskID string) (*Task, error) {
+ tm.mutex.RLock()
+ defer tm.mutex.RUnlock()
+
+ task, exists := tm.tasks[taskID]
+ if !exists {
+ return nil, fmt.Errorf("task not found: %s", taskID)
+ }
+
+ // Return a copy to prevent external modification
+ taskCopy := *task
+ return &taskCopy, nil
+}
+
+// UpdateTask updates task status and result
+func (tm *TaskManager) UpdateTask(taskID string, status TaskStatus, result
map[string]any, errorMsg string) error {
+ tm.mutex.Lock()
+ defer tm.mutex.Unlock()
+
+ task, exists := tm.tasks[taskID]
+ if !exists {
+ return fmt.Errorf("task not found: %s", taskID)
+ }
+
+ // Update task fields
+ task.Status = status
+ task.UpdatedAt = time.Now().UnixMilli()
+
+ if result != nil {
+ task.Result = result
+ }
+
+ if errorMsg != "" {
+ task.Error = errorMsg
+ }
+
+ logger.Infof("[dubbo-go-pixiu] A2A TaskManager updated task %s status
to %s", taskID, status)
+ return nil
+}
+
+// CancelTask cancels a task
+func (tm *TaskManager) CancelTask(taskID string) error {
+ return tm.UpdateTask(taskID, TaskCancelled, nil, "Task canceled")
+}
+
+// ListTasks returns all tasks (for debugging purposes)
+func (tm *TaskManager) ListTasks() map[string]*Task {
+ tm.mutex.RLock()
+ defer tm.mutex.RUnlock()
+
+ // Return copies to prevent external modification
+ result := make(map[string]*Task, len(tm.tasks))
+ for id, task := range tm.tasks {
+ taskCopy := *task
+ result[id] = &taskCopy
+ }
+
+ return result
+}
+
+// GetTaskCount returns the current number of tasks
+func (tm *TaskManager) GetTaskCount() int {
+ tm.mutex.RLock()
+ defer tm.mutex.RUnlock()
+ return len(tm.tasks)
+}
+
+// Stop stops the task manager and cleanup routines
+func (tm *TaskManager) Stop() {
+ if tm.cleanupTicker != nil {
+ tm.cleanupTicker.Stop()
+ }
+ close(tm.stopCh)
+ logger.Infof("[dubbo-go-pixiu] A2A TaskManager stopped")
+}
+
+// cleanupRoutine periodically cleans up completed and expired tasks
+func (tm *TaskManager) cleanupRoutine() {
+ for {
+ select {
+ case <-tm.cleanupTicker.C:
+ tm.cleanupExpiredTasks()
+ case <-tm.stopCh:
+ return
+ }
+ }
+}
+
+// cleanupExpiredTasks removes expired and old completed tasks
+func (tm *TaskManager) cleanupExpiredTasks() {
+ tm.mutex.Lock()
+ defer tm.mutex.Unlock()
+
+ now := time.Now().UnixMilli()
+ var toDelete []string
+
+ for taskID, task := range tm.tasks {
+ shouldDelete := false
+
+ // Check if task is expired (timeout exceeded)
+ if task.Status == TaskPending || task.Status == TaskRunning {
+ if now-task.CreatedAt > task.Timeout {
+ task.Status = TaskFailed
+ task.Error = "Task timeout"
+ task.UpdatedAt = now
+ logger.Warnf("[dubbo-go-pixiu] A2A TaskManager
task %s timed out", taskID)
+ }
+ }
+
+ // Check if completed task should be cleaned up
+ if task.Status == TaskCompleted || task.Status == TaskFailed ||
task.Status == TaskCancelled {
+ if now-task.UpdatedAt > tm.config.TaskRetention {
+ shouldDelete = true
+ }
+ }
+
+ if shouldDelete {
+ toDelete = append(toDelete, taskID)
+ }
+ }
+
+ // Delete expired tasks
+ for _, taskID := range toDelete {
+ delete(tm.tasks, taskID)
+ logger.Debugf("[dubbo-go-pixiu] A2A TaskManager cleaned up task
%s", taskID)
+ }
+
+ if len(toDelete) > 0 {
+ logger.Infof("[dubbo-go-pixiu] A2A TaskManager cleaned up %d
expired tasks", len(toDelete))
+ }
+}
diff --git a/pkg/filter/a2a/task_manager_test.go
b/pkg/filter/a2a/task_manager_test.go
new file mode 100644
index 00000000..06e8d802
--- /dev/null
+++ b/pkg/filter/a2a/task_manager_test.go
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+import (
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+// TestTaskManager_CreateAndGet tests basic task creation and retrieval
+func TestTaskManager_CreateAndGet(t *testing.T) {
+ config := &TaskConfig{
+ DefaultTimeout: 5000,
+ MaxConcurrentTasks: 10,
+ CleanupInterval: 0, // Disable cleanup for this test
+ }
+
+ tm := NewTaskManager(config)
+ defer tm.Stop()
+
+ // Create a task
+ task, err := tm.CreateTask("agent-1", "agent-2", "test_task",
map[string]any{
+ "message": "test",
+ }, 3000)
+
+ assert.NoError(t, err)
+ assert.NotNil(t, task)
+ assert.Equal(t, "agent-1", task.From)
+ assert.Equal(t, "agent-2", task.To)
+ assert.Equal(t, "test_task", task.Type)
+ assert.Equal(t, TaskPending, task.Status)
+
+ // Get the task
+ retrievedTask, err := tm.GetTask(task.TaskID)
+ assert.NoError(t, err)
+ assert.Equal(t, task.TaskID, retrievedTask.TaskID)
+ assert.Equal(t, task.From, retrievedTask.From)
+}
+
+// TestTaskManager_UpdateTask tests task updates
+func TestTaskManager_UpdateTask(t *testing.T) {
+ config := &TaskConfig{
+ DefaultTimeout: 5000,
+ MaxConcurrentTasks: 10,
+ CleanupInterval: 0,
+ }
+
+ tm := NewTaskManager(config)
+ defer tm.Stop()
+
+ // Create a task
+ task, err := tm.CreateTask("agent-1", "agent-2", "test_task",
map[string]any{
+ "data": "test",
+ }, 0)
+ assert.NoError(t, err)
+
+ // Update the task
+ result := map[string]any{
+ "output": "success",
+ }
+ err = tm.UpdateTask(task.TaskID, TaskCompleted, result, "")
+ assert.NoError(t, err)
+
+ // Verify update
+ updatedTask, err := tm.GetTask(task.TaskID)
+ assert.NoError(t, err)
+ assert.Equal(t, TaskCompleted, updatedTask.Status)
+ assert.NotNil(t, updatedTask.Result)
+ assert.Equal(t, "success", updatedTask.Result["output"])
+}
+
+// TestTaskManager_CancelTask tests task cancellation
+func TestTaskManager_CancelTask(t *testing.T) {
+ config := &TaskConfig{
+ DefaultTimeout: 5000,
+ MaxConcurrentTasks: 10,
+ CleanupInterval: 0,
+ }
+
+ tm := NewTaskManager(config)
+ defer tm.Stop()
+
+ // Create a task
+ task, err := tm.CreateTask("agent-1", "agent-2", "test_task",
map[string]any{}, 0)
+ assert.NoError(t, err)
+
+ // Cancel the task
+ err = tm.CancelTask(task.TaskID)
+ assert.NoError(t, err)
+
+ // Verify cancellation
+ cancelledTask, err := tm.GetTask(task.TaskID)
+ assert.NoError(t, err)
+ assert.Equal(t, TaskCancelled, cancelledTask.Status)
+}
+
+// TestTaskManager_ConcurrentLimit tests the concurrent task limit
+func TestTaskManager_ConcurrentLimit(t *testing.T) {
+ config := &TaskConfig{
+ DefaultTimeout: 5000,
+ MaxConcurrentTasks: 3, // Small limit for testing
+ CleanupInterval: 0,
+ }
+
+ tm := NewTaskManager(config)
+ defer tm.Stop()
+
+ // Create tasks up to the limit
+ for i := 0; i < 3; i++ {
+ _, err := tm.CreateTask("agent-1", "agent-2", "test_task",
map[string]any{}, 0)
+ assert.NoError(t, err)
+ }
+
+ // Try to create one more task (should fail)
+ _, err := tm.CreateTask("agent-1", "agent-2", "test_task",
map[string]any{}, 0)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "maximum concurrent tasks limit
reached")
+}
+
+// TestTaskManager_GetTaskNotFound tests getting a non-existent task
+func TestTaskManager_GetTaskNotFound(t *testing.T) {
+ config := &TaskConfig{
+ DefaultTimeout: 5000,
+ MaxConcurrentTasks: 10,
+ CleanupInterval: 0,
+ }
+
+ tm := NewTaskManager(config)
+ defer tm.Stop()
+
+ // Try to get a non-existent task
+ _, err := tm.GetTask("non-existent-task-id")
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "task not found")
+}
+
+// TestTaskManager_ListTasks tests listing all tasks
+func TestTaskManager_ListTasks(t *testing.T) {
+ config := &TaskConfig{
+ DefaultTimeout: 5000,
+ MaxConcurrentTasks: 10,
+ CleanupInterval: 0,
+ }
+
+ tm := NewTaskManager(config)
+ defer tm.Stop()
+
+ // Create multiple tasks
+ task1, _ := tm.CreateTask("agent-1", "agent-2", "task1",
map[string]any{}, 0)
+ task2, _ := tm.CreateTask("agent-1", "agent-3", "task2",
map[string]any{}, 0)
+
+ // List all tasks
+ tasks := tm.ListTasks()
+ assert.Equal(t, 2, len(tasks))
+ assert.NotNil(t, tasks[task1.TaskID])
+ assert.NotNil(t, tasks[task2.TaskID])
+}
+
+// TestTaskManager_Cleanup tests the cleanup mechanism
+func TestTaskManager_Cleanup(t *testing.T) {
+ config := &TaskConfig{
+ DefaultTimeout: 100, // Very short timeout
+ MaxConcurrentTasks: 10,
+ CleanupInterval: 50, // Frequent cleanup
+ TaskRetention: 100, // Short retention
+ }
+
+ tm := NewTaskManager(config)
+ defer tm.Stop()
+
+ // Create a task
+ task, err := tm.CreateTask("agent-1", "agent-2", "test_task",
map[string]any{}, 100)
+ assert.NoError(t, err)
+
+ // Wait for timeout
+ time.Sleep(150 * time.Millisecond)
+
+ // Manually trigger cleanup
+ tm.cleanupExpiredTasks()
+
+ // Task should be marked as failed due to timeout
+ timedOutTask, err := tm.GetTask(task.TaskID)
+ assert.NoError(t, err)
+ assert.Equal(t, TaskFailed, timedOutTask.Status)
+ assert.Contains(t, timedOutTask.Error, "timeout")
+}
+
+// TestTaskManager_GetTaskCount tests the task count functionality
+func TestTaskManager_GetTaskCount(t *testing.T) {
+ config := &TaskConfig{
+ DefaultTimeout: 5000,
+ MaxConcurrentTasks: 10,
+ CleanupInterval: 0,
+ }
+
+ tm := NewTaskManager(config)
+ defer tm.Stop()
+
+ assert.Equal(t, 0, tm.GetTaskCount())
+
+ // Create tasks
+ tm.CreateTask("agent-1", "agent-2", "task1", map[string]any{}, 0)
+ assert.Equal(t, 1, tm.GetTaskCount())
+
+ tm.CreateTask("agent-1", "agent-3", "task2", map[string]any{}, 0)
+ assert.Equal(t, 2, tm.GetTaskCount())
+}
diff --git a/pkg/filter/a2a/types.go b/pkg/filter/a2a/types.go
new file mode 100644
index 00000000..a2aefeab
--- /dev/null
+++ b/pkg/filter/a2a/types.go
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package a2a
+
+// JSONRPCRequest represents a JSON-RPC 2.0 request
+type JSONRPCRequest struct {
+ JSONRPC string `json:"jsonrpc"`
+ Method string `json:"method"`
+ Params any `json:"params,omitempty"`
+ ID any `json:"id"`
+}
+
+// JSONRPCResponse represents a JSON-RPC 2.0 response
+type JSONRPCResponse struct {
+ JSONRPC string `json:"jsonrpc"`
+ Result any `json:"result,omitempty"`
+ Error *RPCError `json:"error,omitempty"`
+ ID any `json:"id"`
+}
+
+// RPCError represents a JSON-RPC error object
+type RPCError struct {
+ Code int `json:"code"`
+ Message string `json:"message"`
+ Data any `json:"data,omitempty"`
+}
+
+// AgentStatus represents the status of an agent
+type AgentStatus string
+
+const (
+ StatusOnline AgentStatus = AgentStatusOnline
+ StatusOffline AgentStatus = AgentStatusOffline
+ StatusBusy AgentStatus = AgentStatusBusy
+)
+
+// TaskStatus represents the status of a task
+type TaskStatus string
+
+const (
+ TaskPending TaskStatus = TaskStatusPending
+ TaskRunning TaskStatus = TaskStatusRunning
+ TaskCompleted TaskStatus = TaskStatusCompleted
+ TaskFailed TaskStatus = TaskStatusFailed
+ TaskCancelled TaskStatus = TaskStatusCancelled
+)
+
+// AgentInfo represents information about an agent
+type AgentInfo struct {
+ AgentID string `json:"agent_id"`
+ Name string `json:"name"`
+ Version string `json:"version"`
+ Description string `json:"description,omitempty"`
+ Endpoint string `json:"endpoint"`
+ Status AgentStatus `json:"status"`
+ Capabilities []Capability `json:"capabilities,omitempty"`
+ Metadata map[string]any `json:"metadata,omitempty"`
+}
+
+// Capability represents a capability that an agent provides
+type Capability struct {
+ Name string `json:"name"`
+ Description string `json:"description,omitempty"`
+ InputTypes []string `json:"input_types,omitempty"`
+ OutputTypes []string `json:"output_types,omitempty"`
+ Tags []string `json:"tags,omitempty"`
+ Parameters []Parameter `json:"parameters,omitempty"`
+}
+
+// Parameter represents a capability parameter
+type Parameter struct {
+ Name string `json:"name"`
+ Type string `json:"type"`
+ Description string `json:"description,omitempty"`
+ Required bool `json:"required"`
+ Default any `json:"default,omitempty"`
+}
+
+// Task represents a task that can be executed by an agent
+type Task struct {
+ TaskID string `json:"task_id"`
+ From string `json:"from"`
+ To string `json:"to"`
+ Type string `json:"type"`
+ Content map[string]any `json:"content"`
+ Status TaskStatus `json:"status"`
+ Result map[string]any `json:"result,omitempty"`
+ Error string `json:"error,omitempty"`
+ CreatedAt int64 `json:"created_at"`
+ UpdatedAt int64 `json:"updated_at"`
+ Timeout int64 `json:"timeout,omitempty"`
+}
+
+// Message represents a message sent between agents
+type Message struct {
+ MessageID string `json:"message_id"`
+ From string `json:"from"`
+ To string `json:"to"`
+ Type string `json:"type"`
+ Content map[string]any `json:"content"`
+ Timestamp int64 `json:"timestamp"`
+}
+
+// PingRequest represents a ping request
+type PingRequest struct {
+ Timestamp int64 `json:"timestamp,omitempty"`
+}
+
+// PingResponse represents a ping response
+type PingResponse struct {
+ Status string `json:"status"`
+ AgentID string `json:"agent_id"`
+ Timestamp int64 `json:"timestamp"`
+}
+
+// DiscoverAgentsRequest represents a request to discover agents
+type DiscoverAgentsRequest struct {
+ Capabilities []string `json:"capabilities,omitempty"`
+ Tags []string `json:"tags,omitempty"`
+ Status AgentStatus `json:"status,omitempty"`
+ Filters map[string]string `json:"filters,omitempty"`
+}
+
+// DiscoverAgentsResponse represents the response to an agent discovery request
+type DiscoverAgentsResponse struct {
+ Agents []AgentInfo `json:"agents"`
+}
+
+// CreateTaskRequest represents a request to create a task
+type CreateTaskRequest struct {
+ To string `json:"to"`
+ Type string `json:"type"`
+ Content map[string]any `json:"content"`
+ Timeout int64 `json:"timeout,omitempty"`
+}
+
+// CreateTaskResponse represents the response to a create task request
+type CreateTaskResponse struct {
+ TaskID string `json:"task_id"`
+ Status TaskStatus `json:"status"`
+}
+
+// GetTaskStatusRequest represents a request to get task status
+type GetTaskStatusRequest struct {
+ TaskID string `json:"task_id"`
+}
+
+// GetTaskStatusResponse represents the response to a get task status request
+type GetTaskStatusResponse struct {
+ Task Task `json:"task"`
+}
+
+// UpdateTaskRequest represents a request to update a task
+type UpdateTaskRequest struct {
+ TaskID string `json:"task_id"`
+ Status TaskStatus `json:"status,omitempty"`
+ Result map[string]any `json:"result,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+// UpdateTaskResponse represents the response to an update task request
+type UpdateTaskResponse struct {
+ Success bool `json:"success"`
+ Message string `json:"message,omitempty"`
+}
+
+// SendMessageRequest represents a request to send a message
+type SendMessageRequest struct {
+ To string `json:"to"`
+ Type string `json:"type"`
+ Content map[string]any `json:"content"`
+}
+
+// SendMessageResponse represents the response to a send message request
+type SendMessageResponse struct {
+ MessageID string `json:"message_id"`
+ Success bool `json:"success"`
+ Message string `json:"message,omitempty"`
+}
+
+// BroadcastMessageResponse represents the response to a broadcast message
request
+type BroadcastMessageResponse struct {
+ TotalAgents int `json:"total_agents"`
+ SuccessCount int `json:"success_count"`
+ FailedAgents []string `json:"failed_agents,omitempty"`
+ MessageIDs []string `json:"message_ids"`
+}
diff --git a/pkg/pluginregistry/registry.go b/pkg/pluginregistry/registry.go
index 7fc267ea..2c7a6070 100644
--- a/pkg/pluginregistry/registry.go
+++ b/pkg/pluginregistry/registry.go
@@ -29,6 +29,7 @@ import (
_ "github.com/apache/dubbo-go-pixiu/pkg/cluster/retry/countbased"
_
"github.com/apache/dubbo-go-pixiu/pkg/cluster/retry/exponentialbackoff"
_ "github.com/apache/dubbo-go-pixiu/pkg/cluster/retry/noretry"
+ _ "github.com/apache/dubbo-go-pixiu/pkg/filter/a2a"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/accesslog"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/auth/jwt"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/auth/mcp"