Updated Branches: refs/heads/master a2971ef9c -> ca578e311
initial log publisher implementation Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/f3f015c0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/f3f015c0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/f3f015c0 Branch: refs/heads/master Commit: f3f015c09a036f9a4efb7d497fd466b9deff809b Parents: 3d27c14 Author: Isuru <[email protected]> Authored: Tue Feb 11 10:14:53 2014 +0530 Committer: Isuru <[email protected]> Committed: Tue Feb 11 10:14:53 2014 +0530 ---------------------------------------------------------------------- .../apache/stratos/cartridge/agent/Main.java | 34 ++++++ .../agent/data/publisher/DataContext.java | 52 ++++++++++ .../agent/data/publisher/DataPublisher.java | 96 +++++++++++++++++ .../publisher/DataPublisherConfiguration.java | 73 +++++++++++++ .../exception/DataPublisherException.java | 31 ++++++ .../publisher/log/FileBasedLogPublisher.java | 103 +++++++++++++++++++ .../agent/data/publisher/log/LogPublisher.java | 37 +++++++ 7 files changed, 426 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java index 744b258..5ca2196 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java @@ -24,6 +24,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.log4j.PropertyConfigurator; import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration; import org.apache.stratos.cartridge.agent.config.configurator.JndiConfigurator; +import org.apache.stratos.cartridge.agent.data.publisher.log.FileBasedLogPublisher; +import org.wso2.carbon.databridge.commons.Attribute; +import org.wso2.carbon.databridge.commons.AttributeType; +import org.wso2.carbon.databridge.commons.StreamDefinition; +import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException; + +import java.util.ArrayList; +import java.util.List; /** * Cartridge agent main class. @@ -58,6 +66,32 @@ public class Main { cartridgeAgent.terminate(); } } + + StreamDefinition streamDefinition = null; + + try { + streamDefinition = new StreamDefinition("log.publisher." + "php.isuruh.domain", "1.0.0"); + + } catch (MalformedStreamDefinitionException e) { + throw new RuntimeException(e); + } + + streamDefinition.setDescription("Apache Stratos Instance Log Publisher"); + + List<Attribute> metaDataDefinition = new ArrayList<Attribute>(); + metaDataDefinition.add(new Attribute("ipAddress", AttributeType.STRING)); + metaDataDefinition.add(new Attribute("nodeId", AttributeType.STRING)); + + List<Attribute> payloadDataDefinition = new ArrayList<Attribute>(); + payloadDataDefinition.add(new Attribute("logEvent", AttributeType.STRING)); + + streamDefinition.setMetaData(metaDataDefinition); + streamDefinition.setPayloadData(payloadDataDefinition); + + FileBasedLogPublisher fileBasedLogPublisher = new FileBasedLogPublisher(null, streamDefinition, "", ""); + fileBasedLogPublisher.initialize(); + fileBasedLogPublisher.publish(null); + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataContext.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataContext.java new file mode 100644 index 0000000..0267bae --- /dev/null +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataContext.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cartridge.agent.data.publisher; + +public class DataContext { + + private Object [] metaData; + private Object [] correlationData; + private Object [] payloadData; + + + public Object[] getMetaData() { + return metaData; + } + + public void setMetaData(Object[] metaData) { + this.metaData = metaData; + } + + public Object[] getCorrelationData() { + return correlationData; + } + + public void setCorrelationData(Object[] correlationData) { + this.correlationData = correlationData; + } + + public Object[] getPayloadData() { + return payloadData; + } + + public void setPayloadData(Object[] payloadData) { + this.payloadData = payloadData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java new file mode 100644 index 0000000..8069de9 --- /dev/null +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisher.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cartridge.agent.data.publisher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.databridge.agent.thrift.Agent; +import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; +import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration; +import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; +import org.wso2.carbon.databridge.commons.Event; +import org.wso2.carbon.databridge.commons.StreamDefinition; + +import java.util.Date; +import java.util.HashMap; + +public abstract class DataPublisher { + + private static final Log log = LogFactory.getLog(DataPublisher.class); + + private StreamDefinition streamDefinition; + private DataPublisherConfiguration dataPublisherConfig; + private AsyncDataPublisher dataPublisher; + private boolean isDataPublisherInitialized; + + public DataPublisher (DataPublisherConfiguration dataPublisherConfig, StreamDefinition streamDefinition) { + + this.dataPublisherConfig = dataPublisherConfig; + this.streamDefinition = streamDefinition; + this.setDataPublisherInitialized(false); + } + + public void initialize () { + + AgentConfiguration agentConfiguration = new AgentConfiguration(); + System.setProperty("javax.net.ssl.trustStore", "/home/isuru/wso2/S2/apache/stratos/alpha/wso2bam-2.4.0/repository/resources/security/client-truststore.jks"); + System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); + Agent agent = new Agent(agentConfiguration); + + dataPublisher = new AsyncDataPublisher("tcp://10.217.105.68:7611", "admin", "admin", agent); + + if (!dataPublisher.isStreamDefinitionAdded(streamDefinition.getName(), streamDefinition.getVersion())) { + dataPublisher.addStreamDefinition(streamDefinition); + } + + setDataPublisherInitialized(true); + } + + public void publish (DataContext dataContext) { + + Event event = new Event(); + event.setTimeStamp(new Date().getTime()); + event.setMetaData(new Object[]{"10.217.105.68", "php.isuruh.domain-1dadfdas"}); + event.setPayloadData(new Object[]{"test log event"}); + event.setArbitraryDataMap(new HashMap<String, String>()); + + try { + dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); + + } catch (AgentException e) { + String errorMsg = "Error in publishing event"; + log.error(errorMsg, e); + // no need to throw here + } + } + + public void terminate () { + + dataPublisher.stop(); + } + + public boolean isDataPublisherInitialized() { + return isDataPublisherInitialized; + } + + public void setDataPublisherInitialized(boolean dataPublisherInitialized) { + isDataPublisherInitialized = dataPublisherInitialized; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java new file mode 100644 index 0000000..af1e59d --- /dev/null +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/DataPublisherConfiguration.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cartridge.agent.data.publisher; + +public class DataPublisherConfiguration { + + private String monitoringServerUrl; + private String adminUsername; + private String adminPassword; + private static volatile DataPublisherConfiguration dataPublisherConfiguration; + + private DataPublisherConfiguration () { + readConfig(); + } + + private void readConfig () { + //TODO: read and store + } + + public static DataPublisherConfiguration getInstance () { + + if (dataPublisherConfiguration == null) { + synchronized (DataPublisherConfiguration.class) { + if (dataPublisherConfiguration == null) { + dataPublisherConfiguration = new DataPublisherConfiguration(); + } + } + } + + return dataPublisherConfiguration; + } + + public String getMonitoringServerUrl() { + return monitoringServerUrl; + } + + public void setMonitoringServerUrl(String monitoringServerUrl) { + this.monitoringServerUrl = monitoringServerUrl; + } + + public String getAdminUsername() { + return adminUsername; + } + + public void setAdminUsername(String adminUsername) { + this.adminUsername = adminUsername; + } + + public String getAdminPassword() { + return adminPassword; + } + + public void setAdminPassword(String adminPassword) { + this.adminPassword = adminPassword; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/exception/DataPublisherException.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/exception/DataPublisherException.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/exception/DataPublisherException.java new file mode 100644 index 0000000..f1df0d2 --- /dev/null +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/exception/DataPublisherException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cartridge.agent.data.publisher.exception; + +public class DataPublisherException extends Exception { + + public DataPublisherException(String msg) { + super(msg); + } + + public DataPublisherException(String msg, Exception ex) { + super(msg, ex); + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java new file mode 100644 index 0000000..8aaf461 --- /dev/null +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cartridge.agent.data.publisher.log; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cartridge.agent.data.publisher.DataContext; +import org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration; +import org.wso2.carbon.databridge.commons.StreamDefinition; + +import java.io.IOException; +import java.util.Scanner; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +public class FileBasedLogPublisher extends LogPublisher { + + private static final Log log = LogFactory.getLog(FileBasedLogPublisher.class); + private String memberIp; + private String memberId; + + public FileBasedLogPublisher(DataPublisherConfiguration dataPublisherConfig, StreamDefinition streamDefinition, String memberIp, String memberId) { + super(dataPublisherConfig, streamDefinition); + this.memberIp = memberIp; + this.memberId = memberId; + } + + public void tailFileAndPublishLogs (String filePath) { + ExecutorService executorService = Executors.newSingleThreadExecutor(new FileBasedLogPublisherTaskThreadFactory(filePath)); + executorService.submit(new FileBasedLogPublisherTask(filePath, this, memberId, memberIp)); + } + + private class FileBasedLogPublisherTask implements Runnable { + + private String memberIp; + private String memberId; + private String filePath; + private FileBasedLogPublisher fileBasedLogPublisher; + + public FileBasedLogPublisherTask (String filePath, FileBasedLogPublisher fileBasedLogPublisher, String memberId, String memberIp) { + + this.filePath = filePath; + this.memberId = memberId; + this.memberIp = memberIp; + this.fileBasedLogPublisher = fileBasedLogPublisher; + } + + @Override + public void run() { + + Runtime r = Runtime.getRuntime(); + Process p; + try { + p = r.exec("tail -F " + filePath); + + } catch (IOException e) { + log.error("Error tailing file ", e); + throw new RuntimeException(e); + } + + Scanner s = new Scanner(p.getInputStream()); + while (s.hasNextLine()) { + DataContext dataContext = new DataContext(); + dataContext.setCorrelationData(null); + dataContext.setMetaData(new Object[] {memberIp, memberId}); + dataContext.setPayloadData(new Object[] {s.nextLine()}); + fileBasedLogPublisher.publish(dataContext); + } + } + } + + class FileBasedLogPublisherTaskThreadFactory implements ThreadFactory { + + private String filePath; + + public FileBasedLogPublisherTaskThreadFactory (String filePath) { + this.filePath = filePath; + } + + public Thread newThread(Runnable r) { + return new Thread(r, "File based log publisher thread - " + filePath); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f015c0/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java new file mode 100644 index 0000000..12dee6f --- /dev/null +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/data/publisher/log/LogPublisher.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cartridge.agent.data.publisher.log; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cartridge.agent.data.publisher.DataPublisher; +import org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration; +import org.wso2.carbon.databridge.commons.StreamDefinition; + +public class LogPublisher extends DataPublisher { + + private static final Log log = LogFactory.getLog(LogPublisher.class); + + public LogPublisher (DataPublisherConfiguration dataPublisherConfig, StreamDefinition streamDefinition) { + super(dataPublisherConfig, streamDefinition); + } + + +}
