implementing log data publishing contd.

Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/1b482b53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/1b482b53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/1b482b53

Branch: refs/heads/master
Commit: 1b482b53a567cd4c0d8168e1830cbab56d2ce9bf
Parents: b5e03ff
Author: Isuru <[email protected]>
Authored: Tue Feb 11 20:17:16 2014 +0530
Committer: Isuru <[email protected]>
Committed: Tue Feb 11 20:17:16 2014 +0530

----------------------------------------------------------------------
 .../stratos/cartridge/agent/CartridgeAgent.java |  36 ++++++-
 .../apache/stratos/cartridge/agent/Main.java    |  33 ------
 .../config/CartridgeAgentConfiguration.java     |  15 +++
 .../agent/data/publisher/DataPublisher.java     |  19 ++--
 .../publisher/DataPublisherConfiguration.java   |  90 +++++++++++++++-
 .../agent/data/publisher/log/Constants.java     |  29 ++++++
 .../publisher/log/FileBasedLogPublisher.java    |  88 ++++++++--------
 .../agent/data/publisher/log/LogPublisher.java  |  16 ++-
 .../data/publisher/log/LogPublisherManager.java | 102 +++++++++++++++++++
 .../publisher/HealthStatisticsNotifier.java     |   2 +-
 .../publisher/HealthStatisticsReader.java       |   3 +-
 .../agent/util/CartridgeAgentConstants.java     |   1 +
 .../agent/util/CartridgeAgentUtils.java         |  12 +--
 13 files changed, 351 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index 6de3f9c..c1fb542 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -6,6 +6,9 @@ import org.apache.commons.logging.LogFactory;
 import 
org.apache.stratos.cartridge.agent.artifact.deployment.synchronizer.RepositoryInformation;
 import 
org.apache.stratos.cartridge.agent.artifact.deployment.synchronizer.git.impl.GitBasedArtifactRepository;
 import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
+import 
org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration;
+import 
org.apache.stratos.cartridge.agent.data.publisher.exception.DataPublisherException;
+import 
org.apache.stratos.cartridge.agent.data.publisher.log.LogPublisherManager;
 import 
org.apache.stratos.cartridge.agent.event.publisher.CartridgeAgentEventPublisher;
 import org.apache.stratos.cartridge.agent.util.CartridgeAgentConstants;
 import org.apache.stratos.cartridge.agent.util.CartridgeAgentUtils;
@@ -21,6 +24,8 @@ import 
org.apache.stratos.messaging.message.processor.instance.notifier.Instance
 import 
org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventMessageDelegator;
 import 
org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventMessageReceiver;
 
+import java.util.List;
+
 /**
  * Cartridge agent runnable.
  */
@@ -113,7 +118,7 @@ public class CartridgeAgent implements Runnable {
         ExtensionUtils.executeStartServersExtension();
 
         // Wait for all ports to be active
-        CartridgeAgentUtils.waitUntilPortsActive();
+        CartridgeAgentUtils.waitUntilPortsActive("localhost", 
CartridgeAgentConfiguration.getInstance().getPorts());
 
         // Check repo url
         String repoUrl = 
CartridgeAgentConfiguration.getInstance().getRepoUrl();
@@ -126,13 +131,42 @@ public class CartridgeAgent implements Runnable {
             CartridgeAgentEventPublisher.publishInstanceActivatedEvent();
         }
 
+
         // TODO: Start this thread only if this node is configured as a commit 
true node
         // Start periodical file checker task
         // ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
         // scheduler.scheduleWithFixedDelay(new RepositoryFileListener(), 0, 
10, TimeUnit.SECONDS);
 
         // Keep the thread live until terminated
+
+        // start log publishing
+        LogPublisherManager logPublisherManager = new LogPublisherManager();
+        // check if enabled
+        if (DataPublisherConfiguration.getInstance().isEnabled()) {
+
+            List<String> logFilePaths = 
CartridgeAgentConfiguration.getInstance().getLogFilePaths();
+            if (logFilePaths == null) {
+                log.error("No valid log file paths found, no logs will be 
published");
+
+            } else {
+                // initialize the log publishing
+                
logPublisherManager.init(DataPublisherConfiguration.getInstance());
+
+                // start a log publisher for each file path
+                for (String logFilePath : logFilePaths) {
+                    try {
+                        logPublisherManager.start(logFilePath);
+
+                    } catch (DataPublisherException e) {
+                        log.error("Error occurred in publishing logs ", e);
+                    }
+                }
+            }
+        }
+
         while (!terminated);
+
+        logPublisherManager.stop();
     }
 
     private void onArtifactUpdateEvent(ArtifactUpdatedEvent event) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
index 5ca2196..5f00281 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
@@ -24,14 +24,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
 import org.apache.stratos.cartridge.agent.config.configurator.JndiConfigurator;
-import 
org.apache.stratos.cartridge.agent.data.publisher.log.FileBasedLogPublisher;
-import org.wso2.carbon.databridge.commons.Attribute;
-import org.wso2.carbon.databridge.commons.AttributeType;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-import 
org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
-
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * Cartridge agent main class.
@@ -67,31 +59,6 @@ public class Main {
             }
         }
 
-        StreamDefinition streamDefinition = null;
-
-        try {
-            streamDefinition = new StreamDefinition("log.publisher." + 
"php.isuruh.domain", "1.0.0");
-
-        } catch (MalformedStreamDefinitionException e) {
-            throw new RuntimeException(e);
-        }
-
-        streamDefinition.setDescription("Apache Stratos Instance Log 
Publisher");
-
-        List<Attribute> metaDataDefinition = new ArrayList<Attribute>();
-        metaDataDefinition.add(new Attribute("ipAddress", 
AttributeType.STRING));
-        metaDataDefinition.add(new Attribute("nodeId", AttributeType.STRING));
-
-        List<Attribute> payloadDataDefinition = new ArrayList<Attribute>();
-        payloadDataDefinition.add(new Attribute("logEvent", 
AttributeType.STRING));
-
-        streamDefinition.setMetaData(metaDataDefinition);
-        streamDefinition.setPayloadData(payloadDataDefinition);
-
-        FileBasedLogPublisher fileBasedLogPublisher = new 
FileBasedLogPublisher(null, streamDefinition, "", "");
-        fileBasedLogPublisher.initialize();
-        fileBasedLogPublisher.publish(null);
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java
index 673ce2d..db3d3cd 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/config/CartridgeAgentConfiguration.java
@@ -27,6 +27,7 @@ public class CartridgeAgentConfiguration {
     private final String appPath;
     private final String repoUrl;
     private final List<Integer> ports;
+    private final List<String> logFilePaths;
     private boolean isMultitenant;
 
     private CartridgeAgentConfiguration() {
@@ -39,6 +40,7 @@ public class CartridgeAgentConfiguration {
         appPath = readParameterValue(CartridgeAgentConstants.APP_PATH);
         repoUrl = readParameterValue(CartridgeAgentConstants.REPO_URL);
         ports = readPorts();
+        logFilePaths = readLogFilePaths();
         isMultitenant = readMultitenant(CartridgeAgentConstants.MULTITENANT);
 
         if(log.isInfoEnabled()) {
@@ -114,6 +116,15 @@ public class CartridgeAgentConfiguration {
         return ports;
     }
 
+    private List<String> readLogFilePaths () {
+
+        String logFileStr = 
readParameterValue(CartridgeAgentConstants.LOG_FILE_PATHS);
+        if (logFileStr == null || logFileStr.isEmpty()) {
+            return null;
+        }
+        return CartridgeAgentUtils.splitUsingTokenizer(logFileStr.trim(), "|");
+    }
+
     public String getServiceName() {
         return serviceName;
     }
@@ -150,6 +161,10 @@ public class CartridgeAgentConfiguration {
         return ports;
     }
 
+    public List<String> getLogFilePaths() {
+        return logFilePaths;
+    }
+
        public boolean isMultitenant() {
                return isMultitenant;
        }  

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java
index 8069de9..4601a74 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java
@@ -29,7 +29,6 @@ import org.wso2.carbon.databridge.commons.Event;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 import java.util.Date;
-import java.util.HashMap;
 
 public abstract class DataPublisher {
 
@@ -50,26 +49,28 @@ public abstract class DataPublisher {
     public void initialize () {
 
         AgentConfiguration agentConfiguration = new AgentConfiguration();
-        System.setProperty("javax.net.ssl.trustStore", 
"/home/isuru/wso2/S2/apache/stratos/alpha/wso2bam-2.4.0/repository/resources/security/client-truststore.jks");
-        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
+        //System.setProperty("javax.net.ssl.trustStore", 
"/home/isuru/wso2/S2/apache/stratos/alpha/wso2bam-2.4.0/repository/resources/security/client-truststore.jks");
+        //System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
         Agent agent = new Agent(agentConfiguration);
 
-        dataPublisher = new AsyncDataPublisher("tcp://10.217.105.68:7611", 
"admin", "admin", agent);
+        dataPublisher = new 
AsyncDataPublisher(dataPublisherConfig.getMonitoringServerUrl(), 
dataPublisherConfig.getAdminUsername(),
+                dataPublisherConfig.getAdminPassword(), agent);
 
         if (!dataPublisher.isStreamDefinitionAdded(streamDefinition.getName(), 
streamDefinition.getVersion())) {
             dataPublisher.addStreamDefinition(streamDefinition);
         }
 
         setDataPublisherInitialized(true);
+
+        log.info("DataPublisher initialized");
     }
 
-    public void publish (DataContext dataContext) {
+    protected void publish (DataContext dataContext) {
 
         Event event = new Event();
         event.setTimeStamp(new Date().getTime());
-        event.setMetaData(new Object[]{"10.217.105.68", 
"php.isuruh.domain-1dadfdas"});
-        event.setPayloadData(new Object[]{"test log event"});
-        event.setArbitraryDataMap(new HashMap<String, String>());
+        event.setMetaData(dataContext.getMetaData());
+        event.setPayloadData(dataContext.getPayloadData());
 
         try {
             dataPublisher.publish(streamDefinition.getName(), 
streamDefinition.getVersion(), event);
@@ -81,7 +82,7 @@ public abstract class DataPublisher {
         }
     }
 
-    public void terminate () {
+    protected void terminate () {
 
         dataPublisher.stop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java
index af1e59d..f0c4596 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java
@@ -19,9 +19,26 @@
 
 package org.apache.stratos.cartridge.agent.data.publisher;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 public class DataPublisherConfiguration {
 
+    private static final Log log = 
LogFactory.getLog(DataPublisherConfiguration.class);
+
+    private static final String ENABLE_DATA_PUBLISHER = 
"enable.data.publisher";
+    private static final String MONITORING_SERVER_IP = "monitoring.server.ip";
+    private static final String MONITORING_SERVER_PORT = 
"monitoring.server.port";
+    private static final String MONITORING_SERVER_SECURE_PORT = 
"monitoring.server.secure.port";
+    private static final String MONITORING_SERVER_ADMIN_USERNAME = 
"monitoring.server.admin.username";
+    private static final String MONITORING_SERVER_ADMIN_PASSWORD = 
"monitoring.server.admin.password";
+
+    private boolean enable;
     private String monitoringServerUrl;
+    private String monitoringServerIp;
+    private String monitoringServerPort;
+    private String monitoringServerSecurePort;
     private String adminUsername;
     private String adminPassword;
     private static volatile DataPublisherConfiguration 
dataPublisherConfiguration;
@@ -31,7 +48,46 @@ public class DataPublisherConfiguration {
     }
 
     private void readConfig () {
-        //TODO: read and store
+
+        String enabled = System.getProperty(ENABLE_DATA_PUBLISHER);
+
+        setEnable(Boolean.parseBoolean(enabled));
+        if (!isEnabled()) {
+            log.info("Data Publisher disabled");
+            // disabled; no need to read other parameters
+            return;
+        }
+
+        log.info("Data publishing is enabled");
+
+        monitoringServerIp = System.getProperty(MONITORING_SERVER_IP);
+        if(StringUtils.isBlank(monitoringServerIp)) {
+            throw new RuntimeException("System property not found: " + 
MONITORING_SERVER_IP);
+        }
+
+        monitoringServerPort = System.getProperty(MONITORING_SERVER_PORT);
+        if(StringUtils.isBlank(monitoringServerPort)) {
+            throw new RuntimeException("System property not found: " + 
MONITORING_SERVER_PORT);
+        }
+
+        monitoringServerUrl = "tcp://" + monitoringServerIp + ":" + 
monitoringServerPort;
+
+        monitoringServerSecurePort = 
System.getProperty(MONITORING_SERVER_SECURE_PORT);
+        if(StringUtils.isBlank(monitoringServerSecurePort)) {
+            throw new RuntimeException("System property not found: " + 
MONITORING_SERVER_SECURE_PORT);
+        }
+
+        adminUsername = System.getProperty(MONITORING_SERVER_ADMIN_USERNAME);
+        if(StringUtils.isBlank(adminUsername)) {
+            throw new RuntimeException("System property not found: " + 
MONITORING_SERVER_ADMIN_USERNAME);
+        }
+
+        adminPassword = System.getProperty(MONITORING_SERVER_ADMIN_PASSWORD);
+        if(StringUtils.isBlank(adminPassword)) {
+            throw new RuntimeException("System property not found: " + 
MONITORING_SERVER_ADMIN_PASSWORD);
+        }
+
+        log.info("Data Publisher configuration initialized");
     }
 
     public static DataPublisherConfiguration getInstance () {
@@ -70,4 +126,36 @@ public class DataPublisherConfiguration {
     public void setAdminPassword(String adminPassword) {
         this.adminPassword = adminPassword;
     }
+
+    public boolean isEnabled() {
+        return enable;
+    }
+
+    public void setEnable(boolean enable) {
+        this.enable = enable;
+    }
+
+    public String getMonitoringServerPort() {
+        return monitoringServerPort;
+    }
+
+    public void setMonitoringServerPort(String monitoringServerPort) {
+        this.monitoringServerPort = monitoringServerPort;
+    }
+
+    public String getMonitoringServerSecurePort() {
+        return monitoringServerSecurePort;
+    }
+
+    public void setMonitoringServerSecurePort(String 
monitoringServerSecurePort) {
+        this.monitoringServerSecurePort = monitoringServerSecurePort;
+    }
+
+    public String getMonitoringServerIp() {
+        return monitoringServerIp;
+    }
+
+    public void setMonitoringServerIp(String monitoringServerIp) {
+        this.monitoringServerIp = monitoringServerIp;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/Constants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/Constants.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/Constants.java
new file mode 100644
index 0000000..8a721fc
--- /dev/null
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/Constants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.data.publisher.log;
+
+public class Constants {
+
+    public static String LOG_PUBLISHER_STREAM_PREFIX = "logs.";
+    public static String LOG_PUBLISHER_STREAM_VERSION = "1.0.0";
+    public static String TAIL_COMMAND = "tail -n 100 -F ";
+    public static String MEMBER_ID = "memberId";
+    public static String LOG_EVENT = "logEvent";
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java
index 8aaf461..f75aa27 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java
@@ -32,59 +32,65 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
-public class FileBasedLogPublisher extends LogPublisher {
+public class FileBasedLogPublisher extends LogPublisher implements Runnable {
 
     private static final Log log = 
LogFactory.getLog(FileBasedLogPublisher.class);
-    private String memberIp;
-    private String memberId;
+    private ExecutorService executorService;
+    private Process process;
+    private Scanner scanner;
 
-    public FileBasedLogPublisher(DataPublisherConfiguration 
dataPublisherConfig, StreamDefinition streamDefinition, String memberIp, String 
memberId) {
-        super(dataPublisherConfig, streamDefinition);
-        this.memberIp = memberIp;
-        this.memberId = memberId;
+    public FileBasedLogPublisher(DataPublisherConfiguration 
dataPublisherConfig, StreamDefinition streamDefinition, String filePath, String 
memberId) {
+
+        super(dataPublisherConfig, streamDefinition, filePath, memberId);
+        this.executorService = Executors.newSingleThreadExecutor(new 
FileBasedLogPublisherTaskThreadFactory(filePath));
     }
 
-    public void tailFileAndPublishLogs (String filePath) {
-        ExecutorService executorService = 
Executors.newSingleThreadExecutor(new 
FileBasedLogPublisherTaskThreadFactory(filePath));
-        executorService.submit(new FileBasedLogPublisherTask(filePath, this, 
memberId, memberIp));
+    public void start () {
+        executorService.submit(this);
     }
 
-    private class FileBasedLogPublisherTask implements Runnable {
+    public void stop () {
 
-        private String memberIp;
-        private String memberId;
-        private String filePath;
-        private FileBasedLogPublisher fileBasedLogPublisher;
+        // close the resources
+        try {
+            process.getInputStream().close();
+
+        } catch (IOException e) {
+            log.error("Error in closing [tail -F] input stream", e);
+        }
+        scanner.close();
+        process.destroy();
 
-        public FileBasedLogPublisherTask (String filePath, 
FileBasedLogPublisher fileBasedLogPublisher, String memberId, String memberIp) {
+        executorService.shutdownNow();
+        terminate();
 
-            this.filePath = filePath;
-            this.memberId = memberId;
-            this.memberIp = memberIp;
-            this.fileBasedLogPublisher = fileBasedLogPublisher;
+        log.info("Terminated log publisher for file: " + filePath);
+    }
+
+    @Override
+    public void run() {
+
+        Runtime r = Runtime.getRuntime();
+        try {
+            process = r.exec(Constants.TAIL_COMMAND + filePath);
+
+        } catch (IOException e) {
+            log.error("Error tailing file ", e);
+            throw new RuntimeException(e);
         }
 
-        @Override
-        public void run() {
-
-            Runtime r = Runtime.getRuntime();
-            Process p;
-            try {
-                p = r.exec("tail -F " + filePath);
-
-            } catch (IOException e) {
-                log.error("Error tailing file ", e);
-                throw new RuntimeException(e);
-            }
-
-            Scanner s = new Scanner(p.getInputStream());
-            while (s.hasNextLine()) {
-                DataContext dataContext = new DataContext();
-                dataContext.setCorrelationData(null);
-                dataContext.setMetaData(new Object[] {memberIp, memberId});
-                dataContext.setPayloadData(new Object[] {s.nextLine()});
-                fileBasedLogPublisher.publish(dataContext);
-            }
+        log.info("Starting log publisher for file: " + filePath + ", thread: " 
+ Thread.currentThread().getName());
+
+        scanner = new Scanner(process.getInputStream());
+        while (scanner.hasNextLine()) {
+
+            DataContext dataContext = new DataContext();
+            // set the relevant data
+            dataContext.setCorrelationData(null);
+            dataContext.setMetaData(new Object[] {memberId});
+            dataContext.setPayloadData(new Object[] {scanner.nextLine()});
+            // publish data
+            publish(dataContext);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java
index 12dee6f..306f109 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java
@@ -25,13 +25,25 @@ import 
org.apache.stratos.cartridge.agent.data.publisher.DataPublisher;
 import 
org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
 
-public class LogPublisher extends DataPublisher {
+public abstract class LogPublisher extends DataPublisher {
 
     private static final Log log = LogFactory.getLog(LogPublisher.class);
 
-    public LogPublisher (DataPublisherConfiguration dataPublisherConfig, 
StreamDefinition streamDefinition) {
+    protected String memberId;
+    protected String filePath;
+
+    public LogPublisher (DataPublisherConfiguration dataPublisherConfig, 
StreamDefinition streamDefinition, String filePath, String memberId) {
+
         super(dataPublisherConfig, streamDefinition);
+        this.filePath = filePath;
+        this.memberId = memberId;
+    }
+
+    public void start () {
+
     }
 
+    public void stop () {
 
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisherManager.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisherManager.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisherManager.java
new file mode 100644
index 0000000..1049fdd
--- /dev/null
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisherManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.data.publisher.log;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
+import 
org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration;
+import 
org.apache.stratos.cartridge.agent.data.publisher.exception.DataPublisherException;
+import org.apache.stratos.cartridge.agent.util.CartridgeAgentUtils;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+import 
org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class LogPublisherManager {
+
+    private static final Log log = 
LogFactory.getLog(LogPublisherManager.class);
+
+    private static DataPublisherConfiguration dataPublisherConfig = null;
+    private static StreamDefinition streamDefinition = null;
+    private static List<LogPublisher> fileBasedLogPublishers = new 
ArrayList<LogPublisher>();
+
+    public void init (DataPublisherConfiguration dataPublisherConfig) {
+
+        this.dataPublisherConfig = dataPublisherConfig;
+
+        List<Integer> ports = new ArrayList<Integer>();
+        
ports.add(Integer.parseInt(dataPublisherConfig.getMonitoringServerPort()));
+        
ports.add(Integer.parseInt(dataPublisherConfig.getMonitoringServerSecurePort()));
+
+        // wait till monitoring server ports are active
+        
CartridgeAgentUtils.waitUntilPortsActive(dataPublisherConfig.getMonitoringServerIp(),
 ports);
+
+        // stream definition identifier = {log.publisher.<cluster id>}
+        try {
+            streamDefinition = new 
StreamDefinition(Constants.LOG_PUBLISHER_STREAM_PREFIX + 
CartridgeAgentConfiguration.getInstance().getClusterId(),
+                    Constants.LOG_PUBLISHER_STREAM_VERSION);
+
+        } catch (MalformedStreamDefinitionException e) {
+            throw new RuntimeException(e);
+        }
+
+        streamDefinition.setDescription("Apache Stratos Instance Log 
Publisher");
+
+        List<Attribute> metaDataDefinition = new ArrayList<Attribute>();
+        metaDataDefinition.add(new Attribute(Constants.MEMBER_ID, 
AttributeType.STRING));
+
+        List<Attribute> payloadDataDefinition = new ArrayList<Attribute>();
+        payloadDataDefinition.add(new Attribute(Constants.LOG_EVENT, 
AttributeType.STRING));
+
+        streamDefinition.setMetaData(metaDataDefinition);
+        streamDefinition.setPayloadData(payloadDataDefinition);
+    }
+
+    public void start (String filePath) throws DataPublisherException {
+
+        File logFile = new File (filePath);
+        if (!logFile.exists() || !logFile.canRead() || logFile.isDirectory()) {
+            throw new DataPublisherException("Unable to read the file at path 
" + filePath);
+        }
+
+        LogPublisher fileBasedLogPublisher = new 
FileBasedLogPublisher(dataPublisherConfig, streamDefinition, filePath,
+               CartridgeAgentConfiguration.getInstance().getMemberId());
+
+        fileBasedLogPublisher.initialize();
+        fileBasedLogPublisher.start();
+
+        // add instance to list
+        fileBasedLogPublishers.add(fileBasedLogPublisher);
+    }
+
+    public void stop () {
+
+       if (dataPublisherConfig.isEnabled()) {
+           for (LogPublisher fileBasedLogPublisher : fileBasedLogPublishers) {
+               fileBasedLogPublisher.stop();
+           }
+       }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
index 9d54a87..1dae393 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
@@ -54,7 +54,7 @@ public class HealthStatisticsNotifier implements Runnable {
                 }
 
                 if (statsPublisher.isEnabled()) {
-                    if(!CartridgeAgentUtils.checkPortsActive()) {
+                    if(!CartridgeAgentUtils.checkPortsActive("localhost", 
CartridgeAgentConfiguration.getInstance().getPorts())) {
                         if(log.isInfoEnabled()) {
                             log.info("Publishing ports not open event");
                         }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsReader.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsReader.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsReader.java
index 1b186be..ce7e639 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsReader.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.stratos.cartridge.agent.statistics.publisher;
 
+import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
 import org.apache.stratos.cartridge.agent.util.CartridgeAgentUtils;
 
 import java.lang.management.ManagementFactory;
@@ -42,6 +43,6 @@ public class HealthStatisticsReader {
     }
 
     public static boolean allPortsActive() {
-        return CartridgeAgentUtils.checkPortsActive();
+        return CartridgeAgentUtils.checkPortsActive("localhost", 
CartridgeAgentConfiguration.getInstance().getPorts());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java
index 9914f6f..0a54453 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentConstants.java
@@ -42,6 +42,7 @@ public class CartridgeAgentConstants implements Serializable{
     public static final String MEMBER_ID = "MEMBER_ID";
     public static final String REPO_URL = "REPO_URL";
     public static final String PORTS = "PORTS";
+    public static final String LOG_FILE_PATHS = "LOG_FILE_PATHS";
     public static final String MEMORY_CONSUMPTION = "memory_consumption";
     public static final String LOAD_AVERAGE = "load_average";
     public static final String PORTS_NOT_OPEN = "ports_not_open";

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1b482b53/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
index 3650bb3..7e44cc9 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
@@ -46,7 +46,7 @@ public class CartridgeAgentUtils {
         StringTokenizer tokenizer = new StringTokenizer(string, delimiter);
         List<String> list = new ArrayList<String>(string.length());
         while (tokenizer.hasMoreTokens()) {
-            list.add(tokenizer.nextToken());
+            list.add(tokenizer.nextToken().trim());
         }
         return list;
     }
@@ -71,7 +71,7 @@ public class CartridgeAgentUtils {
         return decryptPassword;
     }
 
-    public static void waitUntilPortsActive() {
+    public static void waitUntilPortsActive(String ipAddress, List<Integer> 
ports) {
         long portCheckTimeOut = 1000 * 60 * 10;
         String portCheckTimeOutStr = System.getProperty("port.check.timeout");
         if (StringUtils.isNotBlank(portCheckTimeOutStr)) {
@@ -87,7 +87,7 @@ public class CartridgeAgentUtils {
             if(log.isInfoEnabled()) {
                 log.info("Waiting for ports to be active");
             }
-            active = checkPortsActive();
+            active = checkPortsActive(ipAddress,  ports);
             long endTime = System.currentTimeMillis();
             long duration = endTime - startTime;
             if (duration > portCheckTimeOut) {
@@ -100,15 +100,15 @@ public class CartridgeAgentUtils {
         }
     }
 
-    public static boolean checkPortsActive() {
-        List<Integer> ports = 
CartridgeAgentConfiguration.getInstance().getPorts();
+    public static boolean checkPortsActive(String ipAddress, List<Integer> 
ports) {
+        //List<Integer> ports = 
CartridgeAgentConfiguration.getInstance().getPorts();
         if (ports.size() == 0) {
             throw new RuntimeException("No ports found");
         }
         for (int port : ports) {
             Socket socket = null;
             try {
-                SocketAddress httpSockaddr = new 
InetSocketAddress("localhost", port);
+                SocketAddress httpSockaddr = new InetSocketAddress(ipAddress, 
port);
                 socket = new Socket();
                 socket.connect(httpSockaddr, 5000);
                 if (log.isInfoEnabled()) {

Reply via email to