Copilot commented on code in PR #5206:
URL: https://github.com/apache/eventmesh/pull/5206#discussion_r2379245383


##########
examples/a2a-agent-client/src/main/java/org/apache/eventmesh/examples/a2a/SimpleA2AAgent.java:
##########
@@ -0,0 +1,469 @@
+package org.apache.eventmesh.examples.a2a;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
+import 
org.apache.eventmesh.runtime.core.protocol.a2a.A2AProtocolAdaptor.A2AMessage;
+import 
org.apache.eventmesh.runtime.core.protocol.a2a.A2AProtocolAdaptor.AgentInfo;
+import 
org.apache.eventmesh.runtime.core.protocol.a2a.A2AProtocolAdaptor.MessageMetadata;

Review Comment:
   These imports reference classes that don't exist in the codebase. The 
A2AProtocolAdaptor class in the diff doesn't contain inner classes A2AMessage, 
AgentInfo, or MessageMetadata, which will cause compilation errors.
   ```suggestion
   
   ```



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/a2a/CollaborationManager.java:
##########
@@ -0,0 +1,450 @@
+package org.apache.eventmesh.runtime.core.protocol.a2a;
+
+import 
org.apache.eventmesh.runtime.core.protocol.a2a.A2AProtocolAdaptor.AgentInfo;
+import 
org.apache.eventmesh.runtime.core.protocol.a2a.pubsub.A2APublishSubscribeService;
+import org.apache.eventmesh.runtime.core.protocol.a2a.pubsub.A2ATaskRequest;
+import org.apache.eventmesh.runtime.core.protocol.a2a.pubsub.A2ATaskMessage;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A2A Collaboration Manager - Refactored for Publish/Subscribe Model
+ * Manages agent collaboration, task coordination, and workflow orchestration
+ * using EventMesh publish/subscribe infrastructure instead of point-to-point 
calls
+ */
+@Slf4j
+public class CollaborationManager {
+    
+    private static final CollaborationManager INSTANCE = new 
CollaborationManager();
+    private final AgentRegistry agentRegistry = AgentRegistry.getInstance();
+    private final Map<String, CollaborationSession> activeSessions = new 
ConcurrentHashMap<>();
+    private final Map<String, WorkflowDefinition> workflows = new 
ConcurrentHashMap<>();
+    private final ExecutorService collaborationExecutor = 
Executors.newFixedThreadPool(5);
+    
+    // Publish/Subscribe service for EventMesh integration
+    private A2APublishSubscribeService pubSubService;
+    
+    private CollaborationManager() {}
+    
+    /**
+     * Initialize with EventMesh producer for publish/subscribe operations
+     */
+    public void initialize(EventMeshProducer eventMeshProducer) {
+        this.pubSubService = new A2APublishSubscribeService(eventMeshProducer);
+        log.info("CollaborationManager initialized with publish/subscribe 
service");
+    }
+    
+    public static CollaborationManager getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Start a collaboration session between agents
+     */
+    public String startCollaboration(String workflowId, List<String> agentIds, 
Map<String, Object> parameters) {
+        String sessionId = UUID.randomUUID().toString();
+        
+        WorkflowDefinition workflow = workflows.get(workflowId);
+        if (workflow == null) {
+            throw new IllegalArgumentException("Workflow not found: " + 
workflowId);
+        }
+        
+        // Validate that all required agents are available
+        List<AgentInfo> availableAgents = new ArrayList<>();
+        for (String agentId : agentIds) {
+            AgentInfo agent = agentRegistry.getAgent(agentId);
+            if (agent != null && agentRegistry.isAgentAlive(agentId)) {
+                availableAgents.add(agent);
+            } else {
+                throw new IllegalArgumentException("Agent not available: " + 
agentId);
+            }
+        }
+        
+        CollaborationSession session = new CollaborationSession(sessionId, 
workflow, availableAgents, parameters);
+        activeSessions.put(sessionId, session);
+        
+        // Start workflow execution
+        collaborationExecutor.submit(() -> executeWorkflow(session));
+        
+        System.out.println("Started collaboration session: " + sessionId + " 
with workflow: " + workflowId);
+        return sessionId;
+    }
+    
+    /**
+     * Execute workflow steps
+     */
+    private void executeWorkflow(CollaborationSession session) {
+        try {
+            WorkflowDefinition workflow = session.getWorkflow();
+            List<WorkflowStep> steps = workflow.getSteps();
+            
+            for (int i = 0; i < steps.size(); i++) {
+                WorkflowStep step = steps.get(i);
+                session.setCurrentStep(i);
+                
+                // Execute step
+                boolean stepSuccess = executeStep(session, step);
+                
+                if (!stepSuccess) {
+                    session.setStatus(CollaborationStatus.FAILED);
+                    System.err.println("Workflow step failed: " + 
step.getName());
+                    return;
+                }
+                
+                // Wait for step completion if needed
+                if (step.getWaitForCompletion()) {
+                    boolean completed = waitForStepCompletion(session, step);
+                    if (!completed) {
+                        session.setStatus(CollaborationStatus.TIMEOUT);
+                        return;
+                    }
+                }
+            }
+            
+            session.setStatus(CollaborationStatus.COMPLETED);
+            System.out.println("Workflow completed successfully: " + 
session.getSessionId());
+            
+        } catch (Exception e) {
+            session.setStatus(CollaborationStatus.FAILED);
+            System.err.println("Workflow execution failed: " + e.getMessage());
+        }
+    }
+    
+    /**
+     * Execute a single workflow step using publish/subscribe model
+     */
+    private boolean executeStep(CollaborationSession session, WorkflowStep 
step) {
+        try {
+            if (pubSubService == null) {
+                log.error("Publish/Subscribe service not initialized");
+                return false;
+            }
+            
+            // Create task request for publish/subscribe
+            A2ATaskRequest taskRequest = A2ATaskRequest.builder()
+                .taskType(step.getName())
+                .payload(step.getParameters())
+                .requiredCapabilities(step.getRequiredCapabilities())
+                .priority(A2ATaskMessage.A2ATaskPriority.HIGH)
+                .timeout(step.getTimeout())
+                .maxRetries(step.getRetryCount())
+                .publisherAgent("collaboration-manager")
+                .correlationId(session.getSessionId())
+                .build();
+            
+            // Publish task to EventMesh topic (no specific target agent)
+            CompletableFuture<String> taskFuture = 
pubSubService.publishTask(taskRequest);
+            
+            taskFuture.whenComplete((taskId, throwable) -> {
+                if (throwable == null) {
+                    // Store step context
+                    session.addStepContext(step.getName(), Map.of(
+                        "taskId", taskId,
+                        "startTime", System.currentTimeMillis(),
+                        "published", true
+                    ));
+                    log.info("Step {} published as task {}", step.getName(), 
taskId);
+                } else {
+                    log.error("Failed to publish step {}", step.getName(), 
throwable);
+                    session.addStepContext(step.getName(), Map.of(
+                        "error", throwable.getMessage(),
+                        "failed", true
+                    ));
+                }
+            });
+            
+            return true;
+            
+        } catch (Exception e) {
+            log.error("Error executing step: {}", step.getName(), e);
+            return false;
+        }
+    }
+    
+    /**
+     * Wait for step completion
+     */
+    private boolean waitForStepCompletion(CollaborationSession session, 
WorkflowStep step) {
+        long timeout = step.getTimeout() > 0 ? step.getTimeout() : 30000; // 
Default 30 seconds
+        long startTime = System.currentTimeMillis();
+        
+        while (System.currentTimeMillis() - startTime < timeout) {
+            Map<String, Object> stepContext = 
session.getStepContext(step.getName());
+            if (stepContext != null && stepContext.containsKey("completed")) {
+                return (Boolean) stepContext.get("completed");
+            }
+            
+            try {
+                Thread.sleep(1000); // Check every second
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return false;
+            }
+        }
+        
+        return false; // Timeout
+    }
+    
+    /**
+     * Find suitable agent for workflow step
+     */
+    private AgentInfo findAgentForStep(List<AgentInfo> availableAgents, 
WorkflowStep step) {
+        for (AgentInfo agent : availableAgents) {
+            if (agent.getCapabilities() != null) {
+                for (String capability : agent.getCapabilities()) {
+                    if (step.getRequiredCapabilities().contains(capability)) {
+                        return agent;
+                    }
+                }
+            }
+        }
+        return null;
+    }
+    
+    /**
+     * Create task request message
+     */
+    private A2AMessage createTaskRequest(CollaborationSession session, 
WorkflowStep step, AgentInfo targetAgent) {
+        A2AMessage taskRequest = new A2AMessage();
+        taskRequest.setMessageType("TASK_REQUEST");
+        taskRequest.setSourceAgent(createSystemAgent());
+        taskRequest.setTargetAgent(targetAgent);
+        
+        // Create task payload
+        Map<String, Object> taskPayload = Map.of(
+            "taskId", UUID.randomUUID().toString(),
+            "taskType", step.getName(),
+            "parameters", step.getParameters(),
+            "sessionId", session.getSessionId(),
+            "workflowId", session.getWorkflow().getId(),
+            "stepIndex", session.getCurrentStep(),
+            "constraints", Map.of(
+                "timeout", step.getTimeout(),
+                "priority", "HIGH",
+                "retryCount", step.getRetryCount()
+            )
+        );
+        
+        taskRequest.setPayload(taskPayload);
+        
+        // Add correlation ID for tracking
+        MessageMetadata metadata = new MessageMetadata();
+        metadata.setCorrelationId(session.getSessionId());
+        taskRequest.setMetadata(metadata);
+        
+        return taskRequest;
+    }
+    
+    /**
+     * Create system agent for internal communication
+     */
+    private AgentInfo createSystemAgent() {
+        AgentInfo systemAgent = new AgentInfo();
+        systemAgent.setAgentId("system-collaboration-manager");
+        systemAgent.setAgentType("system");
+        systemAgent.setCapabilities(new String[]{"workflow-orchestration", 
"task-coordination"});
+        return systemAgent;
+    }
+    
+    /**
+     * Handle task response from agent
+     */
+    public void handleTaskResponse(A2AMessage response) {
+        String sessionId = response.getMetadata().getCorrelationId();
+        CollaborationSession session = activeSessions.get(sessionId);
+        
+        if (session == null) {
+            System.err.println("No active session found for response: " + 
sessionId);
+            return;
+        }
+        
+        // Update step context
+        Map<String, Object> responseData = (Map<String, Object>) 
response.getPayload();
+        String taskId = (String) responseData.get("taskId");
+        
+        // Find step by task ID
+        for (Map.Entry<String, Map<String, Object>> entry : 
session.getStepContexts().entrySet()) {
+            Map<String, Object> stepContext = entry.getValue();
+            if (taskId.equals(stepContext.get("taskId"))) {
+                stepContext.put("completed", true);
+                stepContext.put("result", responseData.get("result"));
+                stepContext.put("endTime", System.currentTimeMillis());
+                break;
+            }
+        }
+    }
+    
+    /**
+     * Register workflow definition
+     */
+    public void registerWorkflow(WorkflowDefinition workflow) {
+        workflows.put(workflow.getId(), workflow);
+        System.out.println("Registered workflow: " + workflow.getId());
+    }
+    
+    /**
+     * Get collaboration session status
+     */
+    public CollaborationStatus getSessionStatus(String sessionId) {
+        CollaborationSession session = activeSessions.get(sessionId);
+        return session != null ? session.getStatus() : null;
+    }
+    
+    /**
+     * Cancel collaboration session
+     */
+    public boolean cancelSession(String sessionId) {
+        CollaborationSession session = activeSessions.remove(sessionId);
+        if (session != null) {
+            session.setStatus(CollaborationStatus.CANCELLED);
+            
+            // Notify agents about cancellation
+            A2AMessage cancelMessage = new A2AMessage();
+            cancelMessage.setMessageType("COLLABORATION_CANCELLED");
+            cancelMessage.setSourceAgent(createSystemAgent());
+            cancelMessage.setPayload(Map.of("sessionId", sessionId, "reason", 
"Cancelled by user"));
+            
+            for (AgentInfo agent : session.getAvailableAgents()) {
+                cancelMessage.setTargetAgent(agent);
+                messageRouter.routeMessage(cancelMessage);

Review Comment:
   The variable `messageRouter` is referenced but not declared or initialized 
in this class. This will cause a compilation error.



##########
eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/A2AProtocolAdaptor.java:
##########
@@ -0,0 +1,242 @@
+package org.apache.eventmesh.protocol.a2a;
+
+import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
+import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
+import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A2A (Agent-to-Agent) Protocol Adaptor
+ * Handles agent-to-agent communication protocol for EventMesh
+ */
+@Slf4j
+public class A2AProtocolAdaptor implements 
ProtocolAdaptor<ProtocolTransportObject> {
+
+    private static final String PROTOCOL_TYPE = "A2A";
+    private static final String PROTOCOL_VERSION = "2.0";
+    
+    private volatile boolean initialized = false;
+
+    @Override
+    public void initialize() {
+        if (!initialized) {
+            log.info("Initializing A2A Protocol Adaptor");
+            initialized = true;
+        }
+    }
+
+    @Override
+    public void destroy() {
+        if (initialized) {
+            log.info("Destroying A2A Protocol Adaptor");
+            initialized = false;
+        }
+    }
+
+    @Override
+    public String getProtocolType() {
+        return PROTOCOL_TYPE;
+    }
+
+    @Override
+    public String getVersion() {
+        return PROTOCOL_VERSION;
+    }
+
+    @Override
+    public int getPriority() {
+        return 80; // High priority for A2A protocol
+    }
+
+    @Override
+    public boolean supportsBatchProcessing() {
+        return true;
+    }
+
+    @Override
+    public Set<String> getCapabilities() {
+        Set<String> capabilities = new HashSet<>();
+        capabilities.add("agent-communication");
+        capabilities.add("workflow-orchestration");
+        capabilities.add("state-sync");
+        return capabilities;
+    }
+
+    @Override
+    public boolean isValid(ProtocolTransportObject cloudEvent) {
+        if (cloudEvent == null) {
+            return false;
+        }
+        
+        try {
+            String content = cloudEvent.toString();
+            return content.contains("A2A") || 
+                   content.contains("agent") || 
+                   content.contains("collaboration");
+        } catch (Exception e) {
+            log.warn("Failed to validate A2A message: {}", e.getMessage());
+            return false;
+        }
+    }
+
+    @Override
+    public CloudEvent toCloudEvent(ProtocolTransportObject 
protocolTransportObject) throws ProtocolHandleException {
+        try {
+            if (!isValid(protocolTransportObject)) {
+                throw new ProtocolHandleException("Invalid A2A protocol 
message");
+            }
+
+            String content = protocolTransportObject.toString();
+            Map<String, Object> messageData = parseA2AMessage(content);
+            
+            CloudEventBuilder builder = CloudEventBuilder.v1()
+                .withId(generateMessageId())
+                .withSource(URI.create("eventmesh-a2a"))
+                .withType("org.apache.eventmesh.protocol.a2a.message")
+                .withData(content.getBytes(StandardCharsets.UTF_8));
+
+            // Add A2A specific extensions (must follow CloudEvent extension 
naming rules)
+            builder.withExtension("protocol", PROTOCOL_TYPE);
+            builder.withExtension("protocolversion", PROTOCOL_VERSION);
+            
+            if (messageData.containsKey("messageType")) {
+                builder.withExtension("messagetype", 
messageData.get("messageType").toString());
+            }
+            
+            if (messageData.containsKey("sourceAgent")) {
+                builder.withExtension("sourceagent", 
messageData.get("sourceAgent").toString());
+            }
+
+            return builder.build();
+            
+        } catch (Exception e) {
+            throw new ProtocolHandleException("Failed to convert A2A message 
to CloudEvent", e);
+        }
+    }
+
+    @Override
+    public List<CloudEvent> toBatchCloudEvent(ProtocolTransportObject 
protocolTransportObject) throws ProtocolHandleException {
+        try {
+            CloudEvent singleEvent = toCloudEvent(protocolTransportObject);
+            return Collections.singletonList(singleEvent);
+        } catch (Exception e) {
+            throw new ProtocolHandleException("Failed to convert A2A batch 
message to CloudEvents", e);
+        }
+    }
+
+    @Override
+    public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) 
throws ProtocolHandleException {
+        try {
+            if (cloudEvent == null) {
+                throw new ProtocolHandleException("CloudEvent cannot be null");
+            }
+
+            // Extract A2A message data from CloudEvent
+            byte[] data = cloudEvent.getData() != null ? 
cloudEvent.getData().toBytes() : new byte[0];
+            String content = new String(data, StandardCharsets.UTF_8);
+
+            // Create a simple protocol transport object wrapper
+            return new A2AProtocolTransportObject(content, cloudEvent);
+            
+        } catch (Exception e) {
+            throw new ProtocolHandleException("Failed to convert CloudEvent to 
A2A message", e);
+        }
+    }
+
+    private Map<String, Object> parseA2AMessage(String content) {
+        try {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> result = JsonUtils.parseObject(content, 
Map.class);
+            return result;
+        } catch (Exception e) {
+            log.debug("Failed to parse as JSON, treating as plain text: {}", 
e.getMessage());
+            Map<String, Object> result = new HashMap<>();
+            result.put("content", content);
+            result.put("messageType", "TEXT");
+            return result;
+        }
+    }
+
+    private String generateMessageId() {
+        return "a2a-" + System.currentTimeMillis() + "-" + Math.random();
+    }
+
+    /**
+     * Simple wrapper for A2A protocol transport object
+     */
+    public static class A2AProtocolTransportObject implements 
ProtocolTransportObject {
+        private final String content;
+        private final CloudEvent sourceCloudEvent;
+
+        public A2AProtocolTransportObject(String content, CloudEvent 
sourceCloudEvent) {
+            this.content = content;
+            this.sourceCloudEvent = sourceCloudEvent;
+        }
+
+        @Override
+        public String toString() {
+            return content;
+        }
+
+        public CloudEvent getSourceCloudEvent() {
+            return sourceCloudEvent;
+        }
+    }
+
+    /**
+     * Agent information container
+     */
+    public static class AgentInfo {
+        private String agentId;
+        private String agentType;
+        private String[] capabilities;
+        private Map<String, Object> metadata;
+
+        public String getAgentId() {
+            return agentId;
+        }
+
+        public void setAgentId(String agentId) {
+            this.agentId = agentId;
+        }
+
+        public String getAgentType() {
+            return agentType;
+        }
+
+        public void setAgentType(String agentType) {
+            this.agentType = agentType;
+        }
+
+        public String[] getCapabilities() {
+            return capabilities;
+        }
+
+        public void setCapabilities(String[] capabilities) {
+            this.capabilities = capabilities;
+        }
+
+        public Map<String, Object> getMetadata() {
+            return metadata;
+        }
+
+        public void setMetadata(Map<String, Object> metadata) {
+            this.metadata = metadata;
+        }
+    }

Review Comment:
   [nitpick] The AgentInfo class is defined as a nested class within 
A2AProtocolAdaptor, but it's also referenced in the runtime package. Consider 
extracting this as a shared class in a common package to avoid circular 
dependencies and improve maintainability.
   ```suggestion
        * @deprecated Moved to org.apache.eventmesh.common.AgentInfo
        */
       // public static class AgentInfo { ... } // Removed. Use 
org.apache.eventmesh.common.AgentInfo instead.
   ```



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/a2a/CollaborationManager.java:
##########
@@ -0,0 +1,450 @@
+package org.apache.eventmesh.runtime.core.protocol.a2a;
+
+import 
org.apache.eventmesh.runtime.core.protocol.a2a.A2AProtocolAdaptor.AgentInfo;
+import 
org.apache.eventmesh.runtime.core.protocol.a2a.pubsub.A2APublishSubscribeService;
+import org.apache.eventmesh.runtime.core.protocol.a2a.pubsub.A2ATaskRequest;
+import org.apache.eventmesh.runtime.core.protocol.a2a.pubsub.A2ATaskMessage;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A2A Collaboration Manager - Refactored for Publish/Subscribe Model
+ * Manages agent collaboration, task coordination, and workflow orchestration
+ * using EventMesh publish/subscribe infrastructure instead of point-to-point 
calls
+ */
+@Slf4j
+public class CollaborationManager {
+    
+    private static final CollaborationManager INSTANCE = new 
CollaborationManager();
+    private final AgentRegistry agentRegistry = AgentRegistry.getInstance();
+    private final Map<String, CollaborationSession> activeSessions = new 
ConcurrentHashMap<>();
+    private final Map<String, WorkflowDefinition> workflows = new 
ConcurrentHashMap<>();
+    private final ExecutorService collaborationExecutor = 
Executors.newFixedThreadPool(5);
+    
+    // Publish/Subscribe service for EventMesh integration
+    private A2APublishSubscribeService pubSubService;
+    
+    private CollaborationManager() {}
+    
+    /**
+     * Initialize with EventMesh producer for publish/subscribe operations
+     */
+    public void initialize(EventMeshProducer eventMeshProducer) {
+        this.pubSubService = new A2APublishSubscribeService(eventMeshProducer);
+        log.info("CollaborationManager initialized with publish/subscribe 
service");
+    }
+    
+    public static CollaborationManager getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Start a collaboration session between agents
+     */
+    public String startCollaboration(String workflowId, List<String> agentIds, 
Map<String, Object> parameters) {
+        String sessionId = UUID.randomUUID().toString();
+        
+        WorkflowDefinition workflow = workflows.get(workflowId);
+        if (workflow == null) {
+            throw new IllegalArgumentException("Workflow not found: " + 
workflowId);
+        }
+        
+        // Validate that all required agents are available
+        List<AgentInfo> availableAgents = new ArrayList<>();
+        for (String agentId : agentIds) {
+            AgentInfo agent = agentRegistry.getAgent(agentId);
+            if (agent != null && agentRegistry.isAgentAlive(agentId)) {
+                availableAgents.add(agent);
+            } else {
+                throw new IllegalArgumentException("Agent not available: " + 
agentId);
+            }
+        }
+        
+        CollaborationSession session = new CollaborationSession(sessionId, 
workflow, availableAgents, parameters);
+        activeSessions.put(sessionId, session);
+        
+        // Start workflow execution
+        collaborationExecutor.submit(() -> executeWorkflow(session));
+        
+        System.out.println("Started collaboration session: " + sessionId + " 
with workflow: " + workflowId);
+        return sessionId;
+    }
+    
+    /**
+     * Execute workflow steps
+     */
+    private void executeWorkflow(CollaborationSession session) {
+        try {
+            WorkflowDefinition workflow = session.getWorkflow();
+            List<WorkflowStep> steps = workflow.getSteps();
+            
+            for (int i = 0; i < steps.size(); i++) {
+                WorkflowStep step = steps.get(i);
+                session.setCurrentStep(i);
+                
+                // Execute step
+                boolean stepSuccess = executeStep(session, step);
+                
+                if (!stepSuccess) {
+                    session.setStatus(CollaborationStatus.FAILED);
+                    System.err.println("Workflow step failed: " + 
step.getName());
+                    return;
+                }
+                
+                // Wait for step completion if needed
+                if (step.getWaitForCompletion()) {
+                    boolean completed = waitForStepCompletion(session, step);
+                    if (!completed) {
+                        session.setStatus(CollaborationStatus.TIMEOUT);
+                        return;
+                    }
+                }
+            }
+            
+            session.setStatus(CollaborationStatus.COMPLETED);
+            System.out.println("Workflow completed successfully: " + 
session.getSessionId());
+            
+        } catch (Exception e) {
+            session.setStatus(CollaborationStatus.FAILED);
+            System.err.println("Workflow execution failed: " + e.getMessage());
+        }
+    }
+    
+    /**
+     * Execute a single workflow step using publish/subscribe model
+     */
+    private boolean executeStep(CollaborationSession session, WorkflowStep 
step) {
+        try {
+            if (pubSubService == null) {
+                log.error("Publish/Subscribe service not initialized");
+                return false;
+            }
+            
+            // Create task request for publish/subscribe
+            A2ATaskRequest taskRequest = A2ATaskRequest.builder()
+                .taskType(step.getName())
+                .payload(step.getParameters())
+                .requiredCapabilities(step.getRequiredCapabilities())
+                .priority(A2ATaskMessage.A2ATaskPriority.HIGH)
+                .timeout(step.getTimeout())
+                .maxRetries(step.getRetryCount())
+                .publisherAgent("collaboration-manager")
+                .correlationId(session.getSessionId())
+                .build();
+            
+            // Publish task to EventMesh topic (no specific target agent)
+            CompletableFuture<String> taskFuture = 
pubSubService.publishTask(taskRequest);
+            
+            taskFuture.whenComplete((taskId, throwable) -> {
+                if (throwable == null) {
+                    // Store step context
+                    session.addStepContext(step.getName(), Map.of(
+                        "taskId", taskId,
+                        "startTime", System.currentTimeMillis(),
+                        "published", true
+                    ));
+                    log.info("Step {} published as task {}", step.getName(), 
taskId);
+                } else {
+                    log.error("Failed to publish step {}", step.getName(), 
throwable);
+                    session.addStepContext(step.getName(), Map.of(
+                        "error", throwable.getMessage(),
+                        "failed", true
+                    ));
+                }
+            });
+            
+            return true;
+            
+        } catch (Exception e) {
+            log.error("Error executing step: {}", step.getName(), e);
+            return false;
+        }
+    }
+    
+    /**
+     * Wait for step completion
+     */
+    private boolean waitForStepCompletion(CollaborationSession session, 
WorkflowStep step) {
+        long timeout = step.getTimeout() > 0 ? step.getTimeout() : 30000; // 
Default 30 seconds
+        long startTime = System.currentTimeMillis();
+        
+        while (System.currentTimeMillis() - startTime < timeout) {
+            Map<String, Object> stepContext = 
session.getStepContext(step.getName());
+            if (stepContext != null && stepContext.containsKey("completed")) {
+                return (Boolean) stepContext.get("completed");
+            }
+            
+            try {
+                Thread.sleep(1000); // Check every second
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return false;
+            }
+        }
+        
+        return false; // Timeout
+    }
+    
+    /**
+     * Find suitable agent for workflow step
+     */
+    private AgentInfo findAgentForStep(List<AgentInfo> availableAgents, 
WorkflowStep step) {
+        for (AgentInfo agent : availableAgents) {
+            if (agent.getCapabilities() != null) {
+                for (String capability : agent.getCapabilities()) {
+                    if (step.getRequiredCapabilities().contains(capability)) {
+                        return agent;
+                    }
+                }
+            }
+        }
+        return null;
+    }
+    
+    /**
+     * Create task request message
+     */
+    private A2AMessage createTaskRequest(CollaborationSession session, 
WorkflowStep step, AgentInfo targetAgent) {
+        A2AMessage taskRequest = new A2AMessage();

Review Comment:
   The A2AMessage class is referenced but not imported or defined in this file. 
This will cause compilation errors. The class needs to be properly imported or 
referenced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to