Updated Branches:
  refs/heads/master a2971ef9c -> ca578e311

initial log publisher implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/f3f015c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/f3f015c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/f3f015c0

Branch: refs/heads/master
Commit: f3f015c09a036f9a4efb7d497fd466b9deff809b
Parents: 3d27c14
Author: Isuru <[email protected]>
Authored: Tue Feb 11 10:14:53 2014 +0530
Committer: Isuru <[email protected]>
Committed: Tue Feb 11 10:14:53 2014 +0530

----------------------------------------------------------------------
 .../apache/stratos/cartridge/agent/Main.java    |  34 ++++++
 .../agent/data/publisher/DataContext.java       |  52 ++++++++++
 .../agent/data/publisher/DataPublisher.java     |  96 +++++++++++++++++
 .../publisher/DataPublisherConfiguration.java   |  73 +++++++++++++
 .../exception/DataPublisherException.java       |  31 ++++++
 .../publisher/log/FileBasedLogPublisher.java    | 103 +++++++++++++++++++
 .../agent/data/publisher/log/LogPublisher.java  |  37 +++++++
 7 files changed, 426 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
index 744b258..5ca2196 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
@@ -24,6 +24,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
 import org.apache.stratos.cartridge.agent.config.configurator.JndiConfigurator;
+import 
org.apache.stratos.cartridge.agent.data.publisher.log.FileBasedLogPublisher;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+import 
org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Cartridge agent main class.
@@ -58,6 +66,32 @@ public class Main {
                 cartridgeAgent.terminate();
             }
         }
+
+        StreamDefinition streamDefinition = null;
+
+        try {
+            streamDefinition = new StreamDefinition("log.publisher." + 
"php.isuruh.domain", "1.0.0");
+
+        } catch (MalformedStreamDefinitionException e) {
+            throw new RuntimeException(e);
+        }
+
+        streamDefinition.setDescription("Apache Stratos Instance Log 
Publisher");
+
+        List<Attribute> metaDataDefinition = new ArrayList<Attribute>();
+        metaDataDefinition.add(new Attribute("ipAddress", 
AttributeType.STRING));
+        metaDataDefinition.add(new Attribute("nodeId", AttributeType.STRING));
+
+        List<Attribute> payloadDataDefinition = new ArrayList<Attribute>();
+        payloadDataDefinition.add(new Attribute("logEvent", 
AttributeType.STRING));
+
+        streamDefinition.setMetaData(metaDataDefinition);
+        streamDefinition.setPayloadData(payloadDataDefinition);
+
+        FileBasedLogPublisher fileBasedLogPublisher = new 
FileBasedLogPublisher(null, streamDefinition, "", "");
+        fileBasedLogPublisher.initialize();
+        fileBasedLogPublisher.publish(null);
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataContext.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataContext.java
new file mode 100644
index 0000000..0267bae
--- /dev/null
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataContext.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.data.publisher;
+
+public class DataContext {
+
+    private Object [] metaData;
+    private Object [] correlationData;
+    private Object [] payloadData;
+
+
+    public Object[] getMetaData() {
+        return metaData;
+    }
+
+    public void setMetaData(Object[] metaData) {
+        this.metaData = metaData;
+    }
+
+    public Object[] getCorrelationData() {
+        return correlationData;
+    }
+
+    public void setCorrelationData(Object[] correlationData) {
+        this.correlationData = correlationData;
+    }
+
+    public Object[] getPayloadData() {
+        return payloadData;
+    }
+
+    public void setPayloadData(Object[] payloadData) {
+        this.payloadData = payloadData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java
new file mode 100644
index 0000000..8069de9
--- /dev/null
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.data.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.databridge.agent.thrift.Agent;
+import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
+import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.Date;
+import java.util.HashMap;
+
+public abstract class DataPublisher {
+
+    private static final Log log = LogFactory.getLog(DataPublisher.class);
+
+    private StreamDefinition streamDefinition;
+    private DataPublisherConfiguration dataPublisherConfig;
+    private AsyncDataPublisher dataPublisher;
+    private boolean isDataPublisherInitialized;
+
+    public DataPublisher (DataPublisherConfiguration dataPublisherConfig, 
StreamDefinition streamDefinition) {
+
+        this.dataPublisherConfig = dataPublisherConfig;
+        this.streamDefinition = streamDefinition;
+        this.setDataPublisherInitialized(false);
+    }
+
+    public void initialize () {
+
+        AgentConfiguration agentConfiguration = new AgentConfiguration();
+        System.setProperty("javax.net.ssl.trustStore", 
"/home/isuru/wso2/S2/apache/stratos/alpha/wso2bam-2.4.0/repository/resources/security/client-truststore.jks");
+        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
+        Agent agent = new Agent(agentConfiguration);
+
+        dataPublisher = new AsyncDataPublisher("tcp://10.217.105.68:7611", 
"admin", "admin", agent);
+
+        if (!dataPublisher.isStreamDefinitionAdded(streamDefinition.getName(), 
streamDefinition.getVersion())) {
+            dataPublisher.addStreamDefinition(streamDefinition);
+        }
+
+        setDataPublisherInitialized(true);
+    }
+
+    public void publish (DataContext dataContext) {
+
+        Event event = new Event();
+        event.setTimeStamp(new Date().getTime());
+        event.setMetaData(new Object[]{"10.217.105.68", 
"php.isuruh.domain-1dadfdas"});
+        event.setPayloadData(new Object[]{"test log event"});
+        event.setArbitraryDataMap(new HashMap<String, String>());
+
+        try {
+            dataPublisher.publish(streamDefinition.getName(), 
streamDefinition.getVersion(), event);
+
+        } catch (AgentException e) {
+            String errorMsg = "Error in publishing event";
+            log.error(errorMsg, e);
+            // no need to throw here
+        }
+    }
+
+    public void terminate () {
+
+        dataPublisher.stop();
+    }
+
+    public boolean isDataPublisherInitialized() {
+        return isDataPublisherInitialized;
+    }
+
+    public void setDataPublisherInitialized(boolean dataPublisherInitialized) {
+        isDataPublisherInitialized = dataPublisherInitialized;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java
new file mode 100644
index 0000000..af1e59d
--- /dev/null
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.data.publisher;
+
+public class DataPublisherConfiguration {
+
+    private String monitoringServerUrl;
+    private String adminUsername;
+    private String adminPassword;
+    private static volatile DataPublisherConfiguration 
dataPublisherConfiguration;
+
+    private DataPublisherConfiguration () {
+        readConfig();
+    }
+
+    private void readConfig () {
+        //TODO: read and store
+    }
+
+    public static DataPublisherConfiguration getInstance () {
+
+        if (dataPublisherConfiguration == null) {
+            synchronized (DataPublisherConfiguration.class) {
+                if (dataPublisherConfiguration == null) {
+                    dataPublisherConfiguration = new 
DataPublisherConfiguration();
+                }
+            }
+        }
+
+        return dataPublisherConfiguration;
+    }
+
+    public String getMonitoringServerUrl() {
+        return monitoringServerUrl;
+    }
+
+    public void setMonitoringServerUrl(String monitoringServerUrl) {
+        this.monitoringServerUrl = monitoringServerUrl;
+    }
+
+    public String getAdminUsername() {
+        return adminUsername;
+    }
+
+    public void setAdminUsername(String adminUsername) {
+        this.adminUsername = adminUsername;
+    }
+
+    public String getAdminPassword() {
+        return adminPassword;
+    }
+
+    public void setAdminPassword(String adminPassword) {
+        this.adminPassword = adminPassword;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/exception/DataPublisherException.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/exception/DataPublisherException.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/exception/DataPublisherException.java
new file mode 100644
index 0000000..f1df0d2
--- /dev/null
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/exception/DataPublisherException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.data.publisher.exception;
+
+public class DataPublisherException extends Exception {
+
+    public DataPublisherException(String msg) {
+        super(msg);
+    }
+
+    public DataPublisherException(String msg, Exception ex) {
+        super(msg, ex);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java
new file mode 100644
index 0000000..8aaf461
--- /dev/null
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.data.publisher.log;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.data.publisher.DataContext;
+import 
org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.io.IOException;
+import java.util.Scanner;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+public class FileBasedLogPublisher extends LogPublisher {
+
+    private static final Log log = 
LogFactory.getLog(FileBasedLogPublisher.class);
+    private String memberIp;
+    private String memberId;
+
+    public FileBasedLogPublisher(DataPublisherConfiguration 
dataPublisherConfig, StreamDefinition streamDefinition, String memberIp, String 
memberId) {
+        super(dataPublisherConfig, streamDefinition);
+        this.memberIp = memberIp;
+        this.memberId = memberId;
+    }
+
+    public void tailFileAndPublishLogs (String filePath) {
+        ExecutorService executorService = 
Executors.newSingleThreadExecutor(new 
FileBasedLogPublisherTaskThreadFactory(filePath));
+        executorService.submit(new FileBasedLogPublisherTask(filePath, this, 
memberId, memberIp));
+    }
+
+    private class FileBasedLogPublisherTask implements Runnable {
+
+        private String memberIp;
+        private String memberId;
+        private String filePath;
+        private FileBasedLogPublisher fileBasedLogPublisher;
+
+        public FileBasedLogPublisherTask (String filePath, 
FileBasedLogPublisher fileBasedLogPublisher, String memberId, String memberIp) {
+
+            this.filePath = filePath;
+            this.memberId = memberId;
+            this.memberIp = memberIp;
+            this.fileBasedLogPublisher = fileBasedLogPublisher;
+        }
+
+        @Override
+        public void run() {
+
+            Runtime r = Runtime.getRuntime();
+            Process p;
+            try {
+                p = r.exec("tail -F " + filePath);
+
+            } catch (IOException e) {
+                log.error("Error tailing file ", e);
+                throw new RuntimeException(e);
+            }
+
+            Scanner s = new Scanner(p.getInputStream());
+            while (s.hasNextLine()) {
+                DataContext dataContext = new DataContext();
+                dataContext.setCorrelationData(null);
+                dataContext.setMetaData(new Object[] {memberIp, memberId});
+                dataContext.setPayloadData(new Object[] {s.nextLine()});
+                fileBasedLogPublisher.publish(dataContext);
+            }
+        }
+    }
+
+    class FileBasedLogPublisherTaskThreadFactory implements ThreadFactory {
+
+        private String filePath;
+
+        public FileBasedLogPublisherTaskThreadFactory (String filePath) {
+            this.filePath = filePath;
+        }
+
+        public Thread newThread(Runnable r) {
+            return new Thread(r, "File based log publisher thread  - " + 
filePath);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java
new file mode 100644
index 0000000..12dee6f
--- /dev/null
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.data.publisher.log;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.data.publisher.DataPublisher;
+import 
org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+public class LogPublisher extends DataPublisher {
+
+    private static final Log log = LogFactory.getLog(LogPublisher.class);
+
+    public LogPublisher (DataPublisherConfiguration dataPublisherConfig, 
StreamDefinition streamDefinition) {
+        super(dataPublisherConfig, streamDefinition);
+    }
+
+
+}

Reply via email to