changes to the existing classes based on new design
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/2499ddb5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/2499ddb5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/2499ddb5 Branch: refs/heads/master Commit: 2499ddb53adf763345c0ec8c74d65a000fee0614 Parents: ede7a1d Author: Nirmal Fernando <[email protected]> Authored: Mon Mar 10 15:07:19 2014 +0530 Committer: Nirmal Fernando <[email protected]> Committed: Mon Mar 10 15:07:19 2014 +0530 ---------------------------------------------------------------------- .../stratos/cartridge/agent/CartridgeAgent.java | 282 ++----------------- .../synchronizer/RepositoryInformation.java | 1 - .../config/CartridgeAgentConfiguration.java | 112 +++++++- .../publisher/CartridgeAgentEventPublisher.java | 1 - .../agent/util/CartridgeAgentConstants.java | 8 +- .../cartridge/agent/util/ExtensionUtils.java | 2 +- 6 files changed, 145 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2499ddb5/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java index c735355..67537ec 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java @@ -1,30 +1,11 @@ package org.apache.stratos.cartridge.agent; -import org.apache.commons.lang3.StringUtils; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cartridge.agent.artifact.deployment.synchronizer.RepositoryInformation; -import org.apache.stratos.cartridge.agent.artifact.deployment.synchronizer.git.impl.GitBasedArtifactRepository; -import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration; -import org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration; -import org.apache.stratos.cartridge.agent.data.publisher.exception.DataPublisherException; -import org.apache.stratos.cartridge.agent.data.publisher.log.LogPublisherManager; -import org.apache.stratos.cartridge.agent.event.publisher.CartridgeAgentEventPublisher; -import org.apache.stratos.cartridge.agent.util.CartridgeAgentConstants; -import org.apache.stratos.cartridge.agent.util.CartridgeAgentUtils; -import org.apache.stratos.cartridge.agent.util.ExtensionUtils; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; -import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent; -import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent; -import org.apache.stratos.messaging.listener.instance.notifier.ArtifactUpdateEventListener; -import org.apache.stratos.messaging.listener.instance.notifier.InstanceCleanupClusterEventListener; -import org.apache.stratos.messaging.listener.instance.notifier.InstanceCleanupMemberEventListener; -import org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain; -import org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventMessageDelegator; -import org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventMessageReceiver; - -import java.util.List; +import org.apache.stratos.cartridge.agent.phase.Phase; +import org.apache.stratos.cartridge.agent.runtime.DataHolder; /** * Cartridge agent runnable. @@ -33,245 +14,36 @@ public class CartridgeAgent implements Runnable { private static final Log log = LogFactory.getLog(CartridgeAgent.class); - private boolean terminated; - @Override public void run() { if(log.isInfoEnabled()) { log.info("Cartridge agent started"); } - - String jndiPropertiesDir = System.getProperty(CartridgeAgentConstants.JNDI_PROPERTIES_DIR); - if(StringUtils.isBlank(jndiPropertiesDir)) { - if(log.isErrorEnabled()){ - log.error(String.format("System property not found: %s", CartridgeAgentConstants.JNDI_PROPERTIES_DIR)); - } - return; - } - - String payloadPath = System.getProperty(CartridgeAgentConstants.PARAM_FILE_PATH); - if(StringUtils.isBlank(payloadPath)) { - if(log.isErrorEnabled()){ - log.error(String.format("System property not found: %s", CartridgeAgentConstants.PARAM_FILE_PATH)); - } - return; - } - - String extensionsDir = System.getProperty(CartridgeAgentConstants.EXTENSIONS_DIR); - if(StringUtils.isBlank(extensionsDir)) { - if(log.isWarnEnabled()){ - log.warn(String.format("System property not found: %s", CartridgeAgentConstants.EXTENSIONS_DIR)); - } - } - - // Start instance notifier listener thread - if(log.isDebugEnabled()) { - log.debug("Starting instance notifier event message receiver thread"); - } - InstanceNotifierMessageProcessorChain processorChain = new InstanceNotifierMessageProcessorChain(); - processorChain.addEventListener(new ArtifactUpdateEventListener() { - @Override - protected void onEvent(Event event) { - onArtifactUpdateEvent((ArtifactUpdatedEvent) event); - } - }); - - processorChain.addEventListener(new InstanceCleanupMemberEventListener() { - @Override - protected void onEvent(Event event) { - String memberIdInPayload = CartridgeAgentConfiguration.getInstance().getMemberId(); - InstanceCleanupMemberEvent instanceCleanupMemberEvent = (InstanceCleanupMemberEvent)event; - if(memberIdInPayload.equals(instanceCleanupMemberEvent.getMemberId())) { - onInstanceCleanupEvent(); - } - } - }); - - processorChain.addEventListener(new InstanceCleanupClusterEventListener() { - @Override - protected void onEvent(Event event) { - String clusterIdInPayload = CartridgeAgentConfiguration.getInstance().getClusterId(); - InstanceCleanupClusterEvent instanceCleanupClusterEvent = (InstanceCleanupClusterEvent)event; - if(clusterIdInPayload.equals(instanceCleanupClusterEvent.getClusterId())) { - onInstanceCleanupEvent(); - } - } - }); - InstanceNotifierEventMessageDelegator messageDelegator = new InstanceNotifierEventMessageDelegator(processorChain); - InstanceNotifierEventMessageReceiver messageReceiver = new InstanceNotifierEventMessageReceiver(messageDelegator); - Thread messageReceiverThread = new Thread(messageReceiver); - messageReceiverThread.start(); - - // Wait until message receiver is subscribed to the topic to - // send the instance started event - while (!messageReceiver.isSubscribed()) { - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - } - } - - // Publish instance started event - CartridgeAgentEventPublisher.publishInstanceStartedEvent(); - - // Execute start servers extension - ExtensionUtils.executeStartServersExtension(); - - // Wait for all ports to be active - CartridgeAgentUtils.waitUntilPortsActive("localhost", CartridgeAgentConfiguration.getInstance().getPorts()); - - // Check repo url - String repoUrl = CartridgeAgentConfiguration.getInstance().getRepoUrl(); - if ("null".equals(repoUrl) || StringUtils.isBlank(repoUrl)) { - if(log.isInfoEnabled()) { - log.info("No artifact repository found"); - } - - // Publish instance activated event - CartridgeAgentEventPublisher.publishInstanceActivatedEvent(); - } - - String persistanceMappingsPayload = CartridgeAgentConfiguration.getInstance().getPersistanceMappings(); - if(persistanceMappingsPayload != null) - ExtensionUtils.executeVolumeMountExtension(persistanceMappingsPayload); - // TODO: Start this thread only if this node is configured as a commit true node - // Start periodical file checker task - // ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - // scheduler.scheduleWithFixedDelay(new RepositoryFileListener(), 0, 10, TimeUnit.SECONDS); - - // Keep the thread live until terminated - - // start log publishing - LogPublisherManager logPublisherManager = new LogPublisherManager(); - publishLogs(logPublisherManager); - - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - - logPublisherManager.stop(); - } - - private static void publishLogs (LogPublisherManager logPublisherManager) { - - // check if enabled - if (DataPublisherConfiguration.getInstance().isEnabled()) { - - List<String> logFilePaths = CartridgeAgentConfiguration.getInstance().getLogFilePaths(); - if (logFilePaths == null) { - log.error("No valid log file paths found, no logs will be published"); - return; - - } else { - // initialize the log publishing - try { - logPublisherManager.init(DataPublisherConfiguration.getInstance()); - - } catch (DataPublisherException e) { - log.error("Error occurred in log publisher initialization", e); - return; - } - - // start a log publisher for each file path - for (String logFilePath : logFilePaths) { - try { - logPublisherManager.start(logFilePath); - - } catch (DataPublisherException e) { - log.error("Error occurred in publishing logs ", e); - } - } - } - } - } - - private void onArtifactUpdateEvent(ArtifactUpdatedEvent event) { - ArtifactUpdatedEvent artifactUpdatedEvent = event; - if(log.isInfoEnabled()) { - log.info(String.format("Artifact update event received: %s", artifactUpdatedEvent.toString())); - } - - String clusterIdInPayload = CartridgeAgentConfiguration.getInstance().getClusterId(); - String localRepoPath = CartridgeAgentConfiguration.getInstance().getAppPath(); - String clusterIdInMessage = artifactUpdatedEvent.getClusterId(); - String repoURL = artifactUpdatedEvent.getRepoURL(); - String repoPassword = CartridgeAgentUtils.decryptPassword(artifactUpdatedEvent.getRepoPassword()); - String repoUsername = artifactUpdatedEvent.getRepoUserName(); - String tenantId = artifactUpdatedEvent.getTenantId(); - boolean isMultitenant = CartridgeAgentConfiguration.getInstance().isMultitenant(); - - if(StringUtils.isNotEmpty(repoURL) && (clusterIdInPayload != null) && clusterIdInPayload.equals(clusterIdInMessage)) { - if(log.isInfoEnabled()) { - log.info("Executing git checkout"); - } - RepositoryInformation repoInformation = new RepositoryInformation(); - repoInformation.setRepoUsername(repoUsername); - repoInformation.setRepoPassword(repoPassword); - repoInformation.setRepoUrl(repoURL); - repoInformation.setRepoPath(localRepoPath); - repoInformation.setTenantId(tenantId); - repoInformation.setMultitenant(isMultitenant); - boolean cloneExists = GitBasedArtifactRepository.getInstance().cloneExists(repoInformation); - GitBasedArtifactRepository.getInstance().checkout(repoInformation); - - ExtensionUtils.executeArtifactsUpdatedExtension(); - - if(!cloneExists){ - // Executed git clone, publish instance activated event - CartridgeAgentEventPublisher.publishInstanceActivatedEvent(); - } - - // Start the artifact update task - boolean artifactUpdateEnabled = Boolean.parseBoolean(System.getProperty(CartridgeAgentConstants.ENABLE_ARTIFACT_UPDATE)); - if (artifactUpdateEnabled) { - - long artifactUpdateInterval = 10; - // get update interval - String artifactUpdateIntervalStr = System.getProperty(CartridgeAgentConstants.ARTIFACT_UPDATE_INTERVAL); - - if (artifactUpdateIntervalStr != null && !artifactUpdateIntervalStr.isEmpty()) { - try { - artifactUpdateInterval = Long.parseLong(artifactUpdateIntervalStr); - - } catch (NumberFormatException e) { - log.error("Invalid artifact sync interval specified ", e); - artifactUpdateInterval = 10; - } - } - - log.info("Artifact updating task enabled, update interval: " + artifactUpdateInterval + "s"); - GitBasedArtifactRepository.getInstance().scheduleSyncTask(repoInformation, artifactUpdateInterval); - - } else { - log.info("Artifact updating task disabled"); - } - - } - } - - private void onInstanceCleanupEvent() { - if(log.isInfoEnabled()) { - log.info("Executing cleaning up the data in the cartridge instance..."); - } - //sending event on the maintenance mode - CartridgeAgentEventPublisher.publishMaintenanceModeEvent(); - - //cleaning up the cartridge instance's data - ExtensionUtils.executeCleanupExtension(); - if(log.isInfoEnabled()) { - log.info("cleaning up finished in the cartridge instance..."); - } - if(log.isInfoEnabled()) { - log.info("publishing ready to shutdown event..."); - } - //publishing the Ready to shutdown event after performing the cleanup - CartridgeAgentEventPublisher.publishInstanceReadyToShutdownEvent(); + + List<Phase> phases = DataHolder.getInstance().getPhases(); + + // ==== basic flow ==== + // Initializing Phase + // -- StartListeners extension + // -- Instance Started ext. + // Starting Phase + // -- Start Servers ext. + // -- Wait till activate ext. + // -- PersistenceVolumeExtensionExecutor + // Working Phase + // -- Log publisher ext. + // -- keep agent live ext. + // CleanUp Phase + + // execute all the phases of the agent + for (Phase phase : phases) { + phase.execute(); + } + + } public void terminate() { - terminated = true; + DataHolder.getInstance().setTerminated(true); } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2499ddb5/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/artifact/deployment/synchronizer/RepositoryInformation.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/artifact/deployment/synchronizer/RepositoryInformation.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/artifact/deployment/synchronizer/RepositoryInformation.java index 1be78a4..68f04c2 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/artifact/deployment/synchronizer/RepositoryInformation.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/artifact/deployment/synchronizer/RepositoryInformation.java @@ -20,7 +20,6 @@ package org.apache.stratos.cartridge.agent.artifact.deployment.synchronizer; /** - * @author wso2 * */ public class RepositoryInformation { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2499ddb5/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java index 622c016..83e887b 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java @@ -2,11 +2,16 @@ package org.apache.stratos.cartridge.agent.config; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cartridge.agent.executor.ExtensionExecutor; +import org.apache.stratos.cartridge.agent.phase.Phase; +import org.apache.stratos.cartridge.agent.runtime.DataHolder; import org.apache.stratos.cartridge.agent.util.CartridgeAgentConstants; import org.apache.stratos.cartridge.agent.util.CartridgeAgentUtils; import java.io.File; +import java.lang.reflect.Constructor; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,7 +37,6 @@ public class CartridgeAgentConfiguration { private final List<String> logFilePaths; private Map<String, String> parameters; private boolean isMultitenant; - private String persistanceMappings; private CartridgeAgentConfiguration() { parameters = loadParametersFile(); @@ -47,6 +51,9 @@ public class CartridgeAgentConfiguration { ports = readPorts(); logFilePaths = readLogFilePaths(); isMultitenant = readMultitenant(CartridgeAgentConstants.MULTITENANT); + + // load agent's flow configuration and extract Phases and Extensions + loadFlowConfig(); if(log.isInfoEnabled()) { log.info("Cartridge agent configuration initialized"); @@ -63,9 +70,110 @@ public class CartridgeAgentConfiguration { log.debug(String.format("repo-url: %s", repoUrl)); log.debug(String.format("ports: %s", ports.toString())); } + } - private boolean readMultitenant(String multitenant) { + public static List<Phase> loadFlowConfig() { + + File file = new File( + System.getProperty(CartridgeAgentConstants.AGENT_FLOW_FILE_PATH)); + + if (!file.exists()) { + String msg = "Cannot find the Agent's flow configuration file at: " + + System.getProperty(CartridgeAgentConstants.AGENT_FLOW_FILE_PATH) + + ". Please set the system property: " + + CartridgeAgentConstants.AGENT_FLOW_FILE_PATH; + log.error(msg); + throw new RuntimeException(msg); + + } + + List<Phase> phases = new ArrayList<Phase>(); + + try { + Scanner scanner = new Scanner(file); + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + if (line.isEmpty()) { + continue; + } + String[] var = line.split("="); + String key = var[0]; + String val = null; + if (var.length > 1) { + val = var[1]; + } + + if (key.contains("[")) { + // this is a definition of a Phase + try { + // load the class + Constructor<?> c = Class.forName(val).getConstructor( + String.class); + String id = key.substring(1, key.length() - 1); + Phase phase = (Phase) c.newInstance(id); + + phases.add(phase); + + } catch (Exception e) { + String msg = "Failed to load the Phase : " + val; + log.error(msg, e); + throw new RuntimeException(msg, e); + } + } else { + // this is a definition of an Extension + try { + if (phases.size() > 0) { + ExtensionExecutor extension; + + if (val == null) { + + // load the class + Constructor<?> c = Class.forName(key) + .getConstructor(); + extension = (ExtensionExecutor) c.newInstance(); + } else { + // split + String[] values = val + .split(CartridgeAgentConstants.SCRIPT_SEPARATOR); + List<String> valuesList = Arrays.asList(values); + + // load the class + Constructor<?> c = Class.forName(key) + .getConstructor(List.class); + extension = (ExtensionExecutor) c + .newInstance(valuesList); + } + + // add the extracted extension to the + // latest phase + Phase latestPhase = phases.get(phases.size() - 1); + latestPhase.addExtension(extension); + } + + } catch (Exception e) { + String msg = "Failed to load the Extension : " + key; + log.error(msg, e); + throw new RuntimeException(msg, e); + } + } + } + scanner.close(); + } catch (Exception e) { + String msg = "Error while reading the Agent's flow configuration file at: " + + System.getProperty(CartridgeAgentConstants.AGENT_FLOW_FILE_PATH) + + ". Please provide a valid configuration file."; + log.error(msg, e); + throw new RuntimeException(msg, e); + } + + // sets the phases + DataHolder.getInstance().setPhases(phases); + + return phases; + } + + private boolean readMultitenant(String multitenant) { String multitenantStringValue = readParameterValue(multitenant); return Boolean.parseBoolean(multitenantStringValue); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2499ddb5/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java index 82420c7..a7206ac 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java @@ -41,7 +41,6 @@ public class CartridgeAgentEventPublisher { log.info("Instance started event published"); } - ExtensionUtils.executeInstanceStartedExtension(); } else { if (log.isWarnEnabled()) { log.warn("Instance already started"); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2499ddb5/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java index 80e3bc2..f0ed4f5 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java @@ -22,9 +22,12 @@ package org.apache.stratos.cartridge.agent.util; import java.io.Serializable; public class CartridgeAgentConstants implements Serializable{ - + + private static final long serialVersionUID = -7743412422597491362L; + public static final String JNDI_PROPERTIES_DIR = "jndi.properties.dir"; public static final String PARAM_FILE_PATH = "param.file.path"; + public static final String AGENT_FLOW_FILE_PATH = "agent.flow.file.path"; public static final String EXTENSIONS_DIR = "extensions.dir"; public static final String INSTANCE_STARTED_SH = "instance-started.sh"; @@ -50,4 +53,7 @@ public class CartridgeAgentConstants implements Serializable{ public static final String MULTITENANT = "MULTITENANT"; public static final String ENABLE_ARTIFACT_UPDATE = "enable.artifact.update"; public static final String ARTIFACT_UPDATE_INTERVAL = "artifact.update.interval"; + + public static final String SCRIPT_SEPARATOR = ","; + } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2499ddb5/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/ExtensionUtils.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/ExtensionUtils.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/ExtensionUtils.java index 2a11103..9efc2ca 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/ExtensionUtils.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/ExtensionUtils.java @@ -41,7 +41,7 @@ public class ExtensionUtils { return extensionsDir; } - private static String prepareCommand(String scriptFile) { + public static String prepareCommand(String scriptFile) { String extensionsDir = getExtensionsDir(); return (extensionsDir.endsWith(File.separator)) ? extensionsDir + scriptFile:
