implementing log data publishing contd.
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/1b482b53 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/1b482b53 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/1b482b53 Branch: refs/heads/master Commit: 1b482b53a567cd4c0d8168e1830cbab56d2ce9bf Parents: b5e03ff Author: Isuru <[email protected]> Authored: Tue Feb 11 20:17:16 2014 +0530 Committer: Isuru <[email protected]> Committed: Tue Feb 11 20:17:16 2014 +0530 ---------------------------------------------------------------------- .../stratos/cartridge/agent/CartridgeAgent.java | 36 ++++++- .../apache/stratos/cartridge/agent/Main.java | 33 ------ .../config/CartridgeAgentConfiguration.java | 15 +++ .../agent/data/publisher/DataPublisher.java | 19 ++-- .../publisher/DataPublisherConfiguration.java | 90 +++++++++++++++- .../agent/data/publisher/log/Constants.java | 29 ++++++ .../publisher/log/FileBasedLogPublisher.java | 88 ++++++++-------- .../agent/data/publisher/log/LogPublisher.java | 16 ++- .../data/publisher/log/LogPublisherManager.java | 102 +++++++++++++++++++ .../publisher/HealthStatisticsNotifier.java | 2 +- .../publisher/HealthStatisticsReader.java | 3 +- .../agent/util/CartridgeAgentConstants.java | 1 + .../agent/util/CartridgeAgentUtils.java | 12 +-- 13 files changed, 351 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java index 6de3f9c..c1fb542 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java @@ -6,6 +6,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.cartridge.agent.artifact.deployment.synchronizer.RepositoryInformation; import org.apache.stratos.cartridge.agent.artifact.deployment.synchronizer.git.impl.GitBasedArtifactRepository; import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration; +import org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration; +import org.apache.stratos.cartridge.agent.data.publisher.exception.DataPublisherException; +import org.apache.stratos.cartridge.agent.data.publisher.log.LogPublisherManager; import org.apache.stratos.cartridge.agent.event.publisher.CartridgeAgentEventPublisher; import org.apache.stratos.cartridge.agent.util.CartridgeAgentConstants; import org.apache.stratos.cartridge.agent.util.CartridgeAgentUtils; @@ -21,6 +24,8 @@ import org.apache.stratos.messaging.message.processor.instance.notifier.Instance import org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventMessageDelegator; import org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventMessageReceiver; +import java.util.List; + /** * Cartridge agent runnable. */ @@ -113,7 +118,7 @@ public class CartridgeAgent implements Runnable { ExtensionUtils.executeStartServersExtension(); // Wait for all ports to be active - CartridgeAgentUtils.waitUntilPortsActive(); + CartridgeAgentUtils.waitUntilPortsActive("localhost", CartridgeAgentConfiguration.getInstance().getPorts()); // Check repo url String repoUrl = CartridgeAgentConfiguration.getInstance().getRepoUrl(); @@ -126,13 +131,42 @@ public class CartridgeAgent implements Runnable { CartridgeAgentEventPublisher.publishInstanceActivatedEvent(); } + // TODO: Start this thread only if this node is configured as a commit true node // Start periodical file checker task // ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // scheduler.scheduleWithFixedDelay(new RepositoryFileListener(), 0, 10, TimeUnit.SECONDS); // Keep the thread live until terminated + + // start log publishing + LogPublisherManager logPublisherManager = new LogPublisherManager(); + // check if enabled + if (DataPublisherConfiguration.getInstance().isEnabled()) { + + List<String> logFilePaths = CartridgeAgentConfiguration.getInstance().getLogFilePaths(); + if (logFilePaths == null) { + log.error("No valid log file paths found, no logs will be published"); + + } else { + // initialize the log publishing + logPublisherManager.init(DataPublisherConfiguration.getInstance()); + + // start a log publisher for each file path + for (String logFilePath : logFilePaths) { + try { + logPublisherManager.start(logFilePath); + + } catch (DataPublisherException e) { + log.error("Error occurred in publishing logs ", e); + } + } + } + } + while (!terminated); + + logPublisherManager.stop(); } private void onArtifactUpdateEvent(ArtifactUpdatedEvent event) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java index 5ca2196..5f00281 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java @@ -24,14 +24,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.log4j.PropertyConfigurator; import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration; import org.apache.stratos.cartridge.agent.config.configurator.JndiConfigurator; -import org.apache.stratos.cartridge.agent.data.publisher.log.FileBasedLogPublisher; -import org.wso2.carbon.databridge.commons.Attribute; -import org.wso2.carbon.databridge.commons.AttributeType; -import org.wso2.carbon.databridge.commons.StreamDefinition; -import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException; - -import java.util.ArrayList; -import java.util.List; /** * Cartridge agent main class. @@ -67,31 +59,6 @@ public class Main { } } - StreamDefinition streamDefinition = null; - - try { - streamDefinition = new StreamDefinition("log.publisher." + "php.isuruh.domain", "1.0.0"); - - } catch (MalformedStreamDefinitionException e) { - throw new RuntimeException(e); - } - - streamDefinition.setDescription("Apache Stratos Instance Log Publisher"); - - List<Attribute> metaDataDefinition = new ArrayList<Attribute>(); - metaDataDefinition.add(new Attribute("ipAddress", AttributeType.STRING)); - metaDataDefinition.add(new Attribute("nodeId", AttributeType.STRING)); - - List<Attribute> payloadDataDefinition = new ArrayList<Attribute>(); - payloadDataDefinition.add(new Attribute("logEvent", AttributeType.STRING)); - - streamDefinition.setMetaData(metaDataDefinition); - streamDefinition.setPayloadData(payloadDataDefinition); - - FileBasedLogPublisher fileBasedLogPublisher = new FileBasedLogPublisher(null, streamDefinition, "", ""); - fileBasedLogPublisher.initialize(); - fileBasedLogPublisher.publish(null); - } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java index 673ce2d..db3d3cd 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java @@ -27,6 +27,7 @@ public class CartridgeAgentConfiguration { private final String appPath; private final String repoUrl; private final List<Integer> ports; + private final List<String> logFilePaths; private boolean isMultitenant; private CartridgeAgentConfiguration() { @@ -39,6 +40,7 @@ public class CartridgeAgentConfiguration { appPath = readParameterValue(CartridgeAgentConstants.APP_PATH); repoUrl = readParameterValue(CartridgeAgentConstants.REPO_URL); ports = readPorts(); + logFilePaths = readLogFilePaths(); isMultitenant = readMultitenant(CartridgeAgentConstants.MULTITENANT); if(log.isInfoEnabled()) { @@ -114,6 +116,15 @@ public class CartridgeAgentConfiguration { return ports; } + private List<String> readLogFilePaths () { + + String logFileStr = readParameterValue(CartridgeAgentConstants.LOG_FILE_PATHS); + if (logFileStr == null || logFileStr.isEmpty()) { + return null; + } + return CartridgeAgentUtils.splitUsingTokenizer(logFileStr.trim(), "|"); + } + public String getServiceName() { return serviceName; } @@ -150,6 +161,10 @@ public class CartridgeAgentConfiguration { return ports; } + public List<String> getLogFilePaths() { + return logFilePaths; + } + public boolean isMultitenant() { return isMultitenant; } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java index 8069de9..4601a74 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java @@ -29,7 +29,6 @@ import org.wso2.carbon.databridge.commons.Event; import org.wso2.carbon.databridge.commons.StreamDefinition; import java.util.Date; -import java.util.HashMap; public abstract class DataPublisher { @@ -50,26 +49,28 @@ public abstract class DataPublisher { public void initialize () { AgentConfiguration agentConfiguration = new AgentConfiguration(); - System.setProperty("javax.net.ssl.trustStore", "/home/isuru/wso2/S2/apache/stratos/alpha/wso2bam-2.4.0/repository/resources/security/client-truststore.jks"); - System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); + //System.setProperty("javax.net.ssl.trustStore", "/home/isuru/wso2/S2/apache/stratos/alpha/wso2bam-2.4.0/repository/resources/security/client-truststore.jks"); + //System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); Agent agent = new Agent(agentConfiguration); - dataPublisher = new AsyncDataPublisher("tcp://10.217.105.68:7611", "admin", "admin", agent); + dataPublisher = new AsyncDataPublisher(dataPublisherConfig.getMonitoringServerUrl(), dataPublisherConfig.getAdminUsername(), + dataPublisherConfig.getAdminPassword(), agent); if (!dataPublisher.isStreamDefinitionAdded(streamDefinition.getName(), streamDefinition.getVersion())) { dataPublisher.addStreamDefinition(streamDefinition); } setDataPublisherInitialized(true); + + log.info("DataPublisher initialized"); } - public void publish (DataContext dataContext) { + protected void publish (DataContext dataContext) { Event event = new Event(); event.setTimeStamp(new Date().getTime()); - event.setMetaData(new Object[]{"10.217.105.68", "php.isuruh.domain-1dadfdas"}); - event.setPayloadData(new Object[]{"test log event"}); - event.setArbitraryDataMap(new HashMap<String, String>()); + event.setMetaData(dataContext.getMetaData()); + event.setPayloadData(dataContext.getPayloadData()); try { dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); @@ -81,7 +82,7 @@ public abstract class DataPublisher { } } - public void terminate () { + protected void terminate () { dataPublisher.stop(); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java index af1e59d..f0c4596 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java @@ -19,9 +19,26 @@ package org.apache.stratos.cartridge.agent.data.publisher; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + public class DataPublisherConfiguration { + private static final Log log = LogFactory.getLog(DataPublisherConfiguration.class); + + private static final String ENABLE_DATA_PUBLISHER = "enable.data.publisher"; + private static final String MONITORING_SERVER_IP = "monitoring.server.ip"; + private static final String MONITORING_SERVER_PORT = "monitoring.server.port"; + private static final String MONITORING_SERVER_SECURE_PORT = "monitoring.server.secure.port"; + private static final String MONITORING_SERVER_ADMIN_USERNAME = "monitoring.server.admin.username"; + private static final String MONITORING_SERVER_ADMIN_PASSWORD = "monitoring.server.admin.password"; + + private boolean enable; private String monitoringServerUrl; + private String monitoringServerIp; + private String monitoringServerPort; + private String monitoringServerSecurePort; private String adminUsername; private String adminPassword; private static volatile DataPublisherConfiguration dataPublisherConfiguration; @@ -31,7 +48,46 @@ public class DataPublisherConfiguration { } private void readConfig () { - //TODO: read and store + + String enabled = System.getProperty(ENABLE_DATA_PUBLISHER); + + setEnable(Boolean.parseBoolean(enabled)); + if (!isEnabled()) { + log.info("Data Publisher disabled"); + // disabled; no need to read other parameters + return; + } + + log.info("Data publishing is enabled"); + + monitoringServerIp = System.getProperty(MONITORING_SERVER_IP); + if(StringUtils.isBlank(monitoringServerIp)) { + throw new RuntimeException("System property not found: " + MONITORING_SERVER_IP); + } + + monitoringServerPort = System.getProperty(MONITORING_SERVER_PORT); + if(StringUtils.isBlank(monitoringServerPort)) { + throw new RuntimeException("System property not found: " + MONITORING_SERVER_PORT); + } + + monitoringServerUrl = "tcp://" + monitoringServerIp + ":" + monitoringServerPort; + + monitoringServerSecurePort = System.getProperty(MONITORING_SERVER_SECURE_PORT); + if(StringUtils.isBlank(monitoringServerSecurePort)) { + throw new RuntimeException("System property not found: " + MONITORING_SERVER_SECURE_PORT); + } + + adminUsername = System.getProperty(MONITORING_SERVER_ADMIN_USERNAME); + if(StringUtils.isBlank(adminUsername)) { + throw new RuntimeException("System property not found: " + MONITORING_SERVER_ADMIN_USERNAME); + } + + adminPassword = System.getProperty(MONITORING_SERVER_ADMIN_PASSWORD); + if(StringUtils.isBlank(adminPassword)) { + throw new RuntimeException("System property not found: " + MONITORING_SERVER_ADMIN_PASSWORD); + } + + log.info("Data Publisher configuration initialized"); } public static DataPublisherConfiguration getInstance () { @@ -70,4 +126,36 @@ public class DataPublisherConfiguration { public void setAdminPassword(String adminPassword) { this.adminPassword = adminPassword; } + + public boolean isEnabled() { + return enable; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } + + public String getMonitoringServerPort() { + return monitoringServerPort; + } + + public void setMonitoringServerPort(String monitoringServerPort) { + this.monitoringServerPort = monitoringServerPort; + } + + public String getMonitoringServerSecurePort() { + return monitoringServerSecurePort; + } + + public void setMonitoringServerSecurePort(String monitoringServerSecurePort) { + this.monitoringServerSecurePort = monitoringServerSecurePort; + } + + public String getMonitoringServerIp() { + return monitoringServerIp; + } + + public void setMonitoringServerIp(String monitoringServerIp) { + this.monitoringServerIp = monitoringServerIp; + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/Constants.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/Constants.java new file mode 100644 index 0000000..8a721fc --- /dev/null +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/Constants.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cartridge.agent.data.publisher.log; + +public class Constants { + + public static String LOG_PUBLISHER_STREAM_PREFIX = "logs."; + public static String LOG_PUBLISHER_STREAM_VERSION = "1.0.0"; + public static String TAIL_COMMAND = "tail -n 100 -F "; + public static String MEMBER_ID = "memberId"; + public static String LOG_EVENT = "logEvent"; +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java index 8aaf461..f75aa27 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java @@ -32,59 +32,65 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -public class FileBasedLogPublisher extends LogPublisher { +public class FileBasedLogPublisher extends LogPublisher implements Runnable { private static final Log log = LogFactory.getLog(FileBasedLogPublisher.class); - private String memberIp; - private String memberId; + private ExecutorService executorService; + private Process process; + private Scanner scanner; - public FileBasedLogPublisher(DataPublisherConfiguration dataPublisherConfig, StreamDefinition streamDefinition, String memberIp, String memberId) { - super(dataPublisherConfig, streamDefinition); - this.memberIp = memberIp; - this.memberId = memberId; + public FileBasedLogPublisher(DataPublisherConfiguration dataPublisherConfig, StreamDefinition streamDefinition, String filePath, String memberId) { + + super(dataPublisherConfig, streamDefinition, filePath, memberId); + this.executorService = Executors.newSingleThreadExecutor(new FileBasedLogPublisherTaskThreadFactory(filePath)); } - public void tailFileAndPublishLogs (String filePath) { - ExecutorService executorService = Executors.newSingleThreadExecutor(new FileBasedLogPublisherTaskThreadFactory(filePath)); - executorService.submit(new FileBasedLogPublisherTask(filePath, this, memberId, memberIp)); + public void start () { + executorService.submit(this); } - private class FileBasedLogPublisherTask implements Runnable { + public void stop () { - private String memberIp; - private String memberId; - private String filePath; - private FileBasedLogPublisher fileBasedLogPublisher; + // close the resources + try { + process.getInputStream().close(); + + } catch (IOException e) { + log.error("Error in closing [tail -F] input stream", e); + } + scanner.close(); + process.destroy(); - public FileBasedLogPublisherTask (String filePath, FileBasedLogPublisher fileBasedLogPublisher, String memberId, String memberIp) { + executorService.shutdownNow(); + terminate(); - this.filePath = filePath; - this.memberId = memberId; - this.memberIp = memberIp; - this.fileBasedLogPublisher = fileBasedLogPublisher; + log.info("Terminated log publisher for file: " + filePath); + } + + @Override + public void run() { + + Runtime r = Runtime.getRuntime(); + try { + process = r.exec(Constants.TAIL_COMMAND + filePath); + + } catch (IOException e) { + log.error("Error tailing file ", e); + throw new RuntimeException(e); } - @Override - public void run() { - - Runtime r = Runtime.getRuntime(); - Process p; - try { - p = r.exec("tail -F " + filePath); - - } catch (IOException e) { - log.error("Error tailing file ", e); - throw new RuntimeException(e); - } - - Scanner s = new Scanner(p.getInputStream()); - while (s.hasNextLine()) { - DataContext dataContext = new DataContext(); - dataContext.setCorrelationData(null); - dataContext.setMetaData(new Object[] {memberIp, memberId}); - dataContext.setPayloadData(new Object[] {s.nextLine()}); - fileBasedLogPublisher.publish(dataContext); - } + log.info("Starting log publisher for file: " + filePath + ", thread: " + Thread.currentThread().getName()); + + scanner = new Scanner(process.getInputStream()); + while (scanner.hasNextLine()) { + + DataContext dataContext = new DataContext(); + // set the relevant data + dataContext.setCorrelationData(null); + dataContext.setMetaData(new Object[] {memberId}); + dataContext.setPayloadData(new Object[] {scanner.nextLine()}); + // publish data + publish(dataContext); } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java index 12dee6f..306f109 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java @@ -25,13 +25,25 @@ import org.apache.stratos.cartridge.agent.data.publisher.DataPublisher; import org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration; import org.wso2.carbon.databridge.commons.StreamDefinition; -public class LogPublisher extends DataPublisher { +public abstract class LogPublisher extends DataPublisher { private static final Log log = LogFactory.getLog(LogPublisher.class); - public LogPublisher (DataPublisherConfiguration dataPublisherConfig, StreamDefinition streamDefinition) { + protected String memberId; + protected String filePath; + + public LogPublisher (DataPublisherConfiguration dataPublisherConfig, StreamDefinition streamDefinition, String filePath, String memberId) { + super(dataPublisherConfig, streamDefinition); + this.filePath = filePath; + this.memberId = memberId; + } + + public void start () { + } + public void stop () { + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisherManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisherManager.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisherManager.java new file mode 100644 index 0000000..1049fdd --- /dev/null +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisherManager.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cartridge.agent.data.publisher.log; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration; +import org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration; +import org.apache.stratos.cartridge.agent.data.publisher.exception.DataPublisherException; +import org.apache.stratos.cartridge.agent.util.CartridgeAgentUtils; +import org.wso2.carbon.databridge.commons.Attribute; +import org.wso2.carbon.databridge.commons.AttributeType; +import org.wso2.carbon.databridge.commons.StreamDefinition; +import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +public class LogPublisherManager { + + private static final Log log = LogFactory.getLog(LogPublisherManager.class); + + private static DataPublisherConfiguration dataPublisherConfig = null; + private static StreamDefinition streamDefinition = null; + private static List<LogPublisher> fileBasedLogPublishers = new ArrayList<LogPublisher>(); + + public void init (DataPublisherConfiguration dataPublisherConfig) { + + this.dataPublisherConfig = dataPublisherConfig; + + List<Integer> ports = new ArrayList<Integer>(); + ports.add(Integer.parseInt(dataPublisherConfig.getMonitoringServerPort())); + ports.add(Integer.parseInt(dataPublisherConfig.getMonitoringServerSecurePort())); + + // wait till monitoring server ports are active + CartridgeAgentUtils.waitUntilPortsActive(dataPublisherConfig.getMonitoringServerIp(), ports); + + // stream definition identifier = {log.publisher.<cluster id>} + try { + streamDefinition = new StreamDefinition(Constants.LOG_PUBLISHER_STREAM_PREFIX + CartridgeAgentConfiguration.getInstance().getClusterId(), + Constants.LOG_PUBLISHER_STREAM_VERSION); + + } catch (MalformedStreamDefinitionException e) { + throw new RuntimeException(e); + } + + streamDefinition.setDescription("Apache Stratos Instance Log Publisher"); + + List<Attribute> metaDataDefinition = new ArrayList<Attribute>(); + metaDataDefinition.add(new Attribute(Constants.MEMBER_ID, AttributeType.STRING)); + + List<Attribute> payloadDataDefinition = new ArrayList<Attribute>(); + payloadDataDefinition.add(new Attribute(Constants.LOG_EVENT, AttributeType.STRING)); + + streamDefinition.setMetaData(metaDataDefinition); + streamDefinition.setPayloadData(payloadDataDefinition); + } + + public void start (String filePath) throws DataPublisherException { + + File logFile = new File (filePath); + if (!logFile.exists() || !logFile.canRead() || logFile.isDirectory()) { + throw new DataPublisherException("Unable to read the file at path " + filePath); + } + + LogPublisher fileBasedLogPublisher = new FileBasedLogPublisher(dataPublisherConfig, streamDefinition, filePath, + CartridgeAgentConfiguration.getInstance().getMemberId()); + + fileBasedLogPublisher.initialize(); + fileBasedLogPublisher.start(); + + // add instance to list + fileBasedLogPublishers.add(fileBasedLogPublisher); + } + + public void stop () { + + if (dataPublisherConfig.isEnabled()) { + for (LogPublisher fileBasedLogPublisher : fileBasedLogPublishers) { + fileBasedLogPublisher.stop(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java index 9d54a87..1dae393 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java @@ -54,7 +54,7 @@ public class HealthStatisticsNotifier implements Runnable { } if (statsPublisher.isEnabled()) { - if(!CartridgeAgentUtils.checkPortsActive()) { + if(!CartridgeAgentUtils.checkPortsActive("localhost", CartridgeAgentConfiguration.getInstance().getPorts())) { if(log.isInfoEnabled()) { log.info("Publishing ports not open event"); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsReader.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsReader.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsReader.java index 1b186be..ce7e639 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsReader.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsReader.java @@ -19,6 +19,7 @@ package org.apache.stratos.cartridge.agent.statistics.publisher; +import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration; import org.apache.stratos.cartridge.agent.util.CartridgeAgentUtils; import java.lang.management.ManagementFactory; @@ -42,6 +43,6 @@ public class HealthStatisticsReader { } public static boolean allPortsActive() { - return CartridgeAgentUtils.checkPortsActive(); + return CartridgeAgentUtils.checkPortsActive("localhost", CartridgeAgentConfiguration.getInstance().getPorts()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java index 9914f6f..0a54453 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java @@ -42,6 +42,7 @@ public class CartridgeAgentConstants implements Serializable{ public static final String MEMBER_ID = "MEMBER_ID"; public static final String REPO_URL = "REPO_URL"; public static final String PORTS = "PORTS"; + public static final String LOG_FILE_PATHS = "LOG_FILE_PATHS"; public static final String MEMORY_CONSUMPTION = "memory_consumption"; public static final String LOAD_AVERAGE = "load_average"; public static final String PORTS_NOT_OPEN = "ports_not_open"; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java index 3650bb3..7e44cc9 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java @@ -46,7 +46,7 @@ public class CartridgeAgentUtils { StringTokenizer tokenizer = new StringTokenizer(string, delimiter); List<String> list = new ArrayList<String>(string.length()); while (tokenizer.hasMoreTokens()) { - list.add(tokenizer.nextToken()); + list.add(tokenizer.nextToken().trim()); } return list; } @@ -71,7 +71,7 @@ public class CartridgeAgentUtils { return decryptPassword; } - public static void waitUntilPortsActive() { + public static void waitUntilPortsActive(String ipAddress, List<Integer> ports) { long portCheckTimeOut = 1000 * 60 * 10; String portCheckTimeOutStr = System.getProperty("port.check.timeout"); if (StringUtils.isNotBlank(portCheckTimeOutStr)) { @@ -87,7 +87,7 @@ public class CartridgeAgentUtils { if(log.isInfoEnabled()) { log.info("Waiting for ports to be active"); } - active = checkPortsActive(); + active = checkPortsActive(ipAddress, ports); long endTime = System.currentTimeMillis(); long duration = endTime - startTime; if (duration > portCheckTimeOut) { @@ -100,15 +100,15 @@ public class CartridgeAgentUtils { } } - public static boolean checkPortsActive() { - List<Integer> ports = CartridgeAgentConfiguration.getInstance().getPorts(); + public static boolean checkPortsActive(String ipAddress, List<Integer> ports) { + //List<Integer> ports = CartridgeAgentConfiguration.getInstance().getPorts(); if (ports.size() == 0) { throw new RuntimeException("No ports found"); } for (int port : ports) { Socket socket = null; try { - SocketAddress httpSockaddr = new InetSocketAddress("localhost", port); + SocketAddress httpSockaddr = new InetSocketAddress(ipAddress, port); socket = new Socket(); socket.connect(httpSockaddr, 5000); if (log.isInfoEnabled()) {
