Updated Branches: refs/heads/master af57d4824 -> 8ee2fb1f1
Added logic to wait until ports active before sending the instance activated event in cartridge agent Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/dfd9743e Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/dfd9743e Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/dfd9743e Branch: refs/heads/master Commit: dfd9743e65d8e8f82ba2a4e4a99823b662622ec6 Parents: 94a940d Author: Imesh Gunaratne <[email protected]> Authored: Wed Dec 18 22:09:56 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Wed Dec 18 22:09:56 2013 +0530 ---------------------------------------------------------------------- .../subscriber/CartridgeAgentConstants.java | 1 + .../cartridge/agent/event/subscriber/Main.java | 165 +++++++++++++------ 2 files changed, 116 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/dfd9743e/products/cartridge-agent/modules/event-subscriber/src/main/java/org/apache/stratos/cartridge/agent/event/subscriber/CartridgeAgentConstants.java ---------------------------------------------------------------------- diff --git a/products/cartridge-agent/modules/event-subscriber/src/main/java/org/apache/stratos/cartridge/agent/event/subscriber/CartridgeAgentConstants.java b/products/cartridge-agent/modules/event-subscriber/src/main/java/org/apache/stratos/cartridge/agent/event/subscriber/CartridgeAgentConstants.java index 610ddcf..383fcc7 100644 --- a/products/cartridge-agent/modules/event-subscriber/src/main/java/org/apache/stratos/cartridge/agent/event/subscriber/CartridgeAgentConstants.java +++ b/products/cartridge-agent/modules/event-subscriber/src/main/java/org/apache/stratos/cartridge/agent/event/subscriber/CartridgeAgentConstants.java @@ -33,4 +33,5 @@ public class CartridgeAgentConstants implements Serializable{ public static final String NETWORK_PARTITION_ID = "NETWORK_PARTITION_ID"; public static final String PARTITION_ID = "PARTITION_ID"; public static final String MEMBER_ID = "MEMBER_ID"; + public static final String PORTS = "PORTS"; } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/dfd9743e/products/cartridge-agent/modules/event-subscriber/src/main/java/org/apache/stratos/cartridge/agent/event/subscriber/Main.java ---------------------------------------------------------------------- diff --git a/products/cartridge-agent/modules/event-subscriber/src/main/java/org/apache/stratos/cartridge/agent/event/subscriber/Main.java b/products/cartridge-agent/modules/event-subscriber/src/main/java/org/apache/stratos/cartridge/agent/event/subscriber/Main.java index 188e77b..2914eec 100644 --- a/products/cartridge-agent/modules/event-subscriber/src/main/java/org/apache/stratos/cartridge/agent/event/subscriber/Main.java +++ b/products/cartridge-agent/modules/event-subscriber/src/main/java/org/apache/stratos/cartridge/agent/event/subscriber/Main.java @@ -19,10 +19,15 @@ package org.apache.stratos.cartridge.agent.event.subscriber; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.broker.publish.EventPublisher; @@ -35,68 +40,128 @@ import org.apache.stratos.messaging.util.Constants; * Event publisher main class. */ public class Main { - - private static final Log log = LogFactory.getLog(Main.class); + + private static final Log log = LogFactory.getLog(Main.class); public static void main(String[] args) { - - log.info("Strating cartridge agent event subscriber"); - - System.setProperty(CartridgeAgentConstants.JNDI_PROPERTIES_DIR, args[0]); - System.setProperty(CartridgeAgentConstants.PARAM_FILE_PATH, args[1]); - + + log.info("Strating cartridge agent event subscriber"); + + System.setProperty(CartridgeAgentConstants.JNDI_PROPERTIES_DIR, args[0]); + System.setProperty(CartridgeAgentConstants.PARAM_FILE_PATH, args[1]); + //initialting the subscriber TopicSubscriber subscriber = new TopicSubscriber(Constants.ARTIFACT_SYNCHRONIZATION_TOPIC); subscriber.setMessageListener(new ArtifactListener()); Thread tsubscriber = new Thread(subscriber); - tsubscriber.start(); - - // - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - log.info("Sending member started event"); - // Send member activated event - InstanceStartedEvent event = new InstanceStartedEvent( + tsubscriber.start(); + + // + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + log.info("Sending member started event"); + // Send member activated event + InstanceStartedEvent event = new InstanceStartedEvent( LaunchParamsUtil.readParamValueFromPayload(CartridgeAgentConstants.SERVICE_NAME), LaunchParamsUtil.readParamValueFromPayload(CartridgeAgentConstants.CLUSTER_ID), LaunchParamsUtil.readParamValueFromPayload(CartridgeAgentConstants.NETWORK_PARTITION_ID), LaunchParamsUtil.readParamValueFromPayload(CartridgeAgentConstants.PARTITION_ID), LaunchParamsUtil.readParamValueFromPayload(CartridgeAgentConstants.MEMBER_ID)); EventPublisher publisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC); - publisher.publish(event); - log.info("Member started event is sent"); - - String repoURL = LaunchParamsUtil - .readParamValueFromPayload("GIT_REPO"); - - if ("null".equals(repoURL) || repoURL == null) { - log.info(" No git repo for this cartridge "); - InstanceActivatedEvent instanceActivatedEvent = new InstanceActivatedEvent( - LaunchParamsUtil - .readParamValueFromPayload(CartridgeAgentConstants.SERVICE_NAME), - LaunchParamsUtil - .readParamValueFromPayload(CartridgeAgentConstants.CLUSTER_ID), - LaunchParamsUtil - .readParamValueFromPayload(CartridgeAgentConstants.NETWORK_PARTITION_ID), - LaunchParamsUtil - .readParamValueFromPayload(CartridgeAgentConstants.PARTITION_ID), - LaunchParamsUtil - .readParamValueFromPayload(CartridgeAgentConstants.MEMBER_ID)); - EventPublisher instanceStatusPublisher = new EventPublisher( - Constants.INSTANCE_STATUS_TOPIC); - instanceStatusPublisher.publish(instanceActivatedEvent); - log.info(" Instance status published. No git repo "); - } - - // Start periodical file checker task - // TODO -- start this thread only if this node configured as a commit true node - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + publisher.publish(event); + log.info("Member started event is sent"); + + String repoURL = LaunchParamsUtil + .readParamValueFromPayload("GIT_REPO"); + + if ("null".equals(repoURL) || repoURL == null) { + log.info(" No git repo for this cartridge"); + waitForPortsTobeActive(); + InstanceActivatedEvent instanceActivatedEvent = new InstanceActivatedEvent( + LaunchParamsUtil + .readParamValueFromPayload(CartridgeAgentConstants.SERVICE_NAME), + LaunchParamsUtil + .readParamValueFromPayload(CartridgeAgentConstants.CLUSTER_ID), + LaunchParamsUtil + .readParamValueFromPayload(CartridgeAgentConstants.NETWORK_PARTITION_ID), + LaunchParamsUtil + .readParamValueFromPayload(CartridgeAgentConstants.PARTITION_ID), + LaunchParamsUtil + .readParamValueFromPayload(CartridgeAgentConstants.MEMBER_ID)); + EventPublisher instanceStatusPublisher = new EventPublisher( + Constants.INSTANCE_STATUS_TOPIC); + instanceStatusPublisher.publish(instanceActivatedEvent); + log.info(" Instance status published. No git repo "); + } + + // Start periodical file checker task + // TODO -- start this thread only if this node configured as a commit true node + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleWithFixedDelay(new RepositoryFileListener(), 0, 10, TimeUnit.SECONDS); - + } - + + private static void waitForPortsTobeActive() { + long portCheckTimeOut = 1000 * 60 * 10; + String portCheckTimeOutStr = System.getProperty("port.check.timeout"); + if (StringUtils.isNotBlank(portCheckTimeOutStr)) { + portCheckTimeOut = Integer.parseInt(portCheckTimeOutStr); + } + if (log.isInfoEnabled()) { + log.info("Port check timeout: " + portCheckTimeOut); + } + + String ports = LaunchParamsUtil.readParamValueFromPayload(CartridgeAgentConstants.PORTS); + if (StringUtils.isBlank(ports)) { + throw new RuntimeException("No ports found"); + } + String[] portsArray = ports.split(","); + + long startTime = System.currentTimeMillis(); + boolean active = false; + while (!active) { + for (String port : portsArray) { + Socket socket = null; + try { + if (log.isInfoEnabled()) { + log.info("Checking port " + port); + } + SocketAddress httpSockaddr = new InetSocketAddress("localhost", Integer.parseInt(port)); + socket = new Socket(); + socket.connect(httpSockaddr, 5000); + active = true; + if (log.isInfoEnabled()) { + log.info(String.format("Port %s is active", port)); + } + } catch (Exception e) { + active = false; + if (log.isInfoEnabled()) { + log.info(String.format("Port %s is not active", port)); + } + break; + } finally { + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + } + } + } + } + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + if (duration > portCheckTimeOut) { + return; + } + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + } + } + }
