Repository: karaf-decanter Updated Branches: refs/heads/master 5810e3414 -> 3f11fc382
[KARAF-3606] Create elastic search client at bundle start Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/3f11fc38 Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/3f11fc38 Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/3f11fc38 Branch: refs/heads/master Commit: 3f11fc3828df4c2ccd8307f1e891c301ebea1237 Parents: 5810e34 Author: Christian Schneider <[email protected]> Authored: Thu Mar 12 12:15:42 2015 +0100 Committer: Christian Schneider <[email protected]> Committed: Thu Mar 12 12:15:42 2015 +0100 ---------------------------------------------------------------------- .../appender/elasticsearch/Activator.java | 21 ++- .../elasticsearch/ElasticsearchAppender.java | 129 +++++++++++-------- 2 files changed, 88 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/3f11fc38/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java ---------------------------------------------------------------------- diff --git a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java index 528ebf6..f43d137 100644 --- a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java +++ b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java @@ -16,29 +16,28 @@ */ package org.apache.karaf.decanter.appender.elasticsearch; +import java.util.Dictionary; +import java.util.Hashtable; + import org.apache.karaf.decanter.api.Appender; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceRegistration; - -import java.util.Dictionary; -import java.util.Properties; public class Activator implements BundleActivator { - private ServiceRegistration service; + private ElasticsearchAppender appender; public void start(BundleContext bundleContext) { - Appender appender = new ElasticsearchAppender(); - Properties properties = new Properties(); + // TODO embed mode and configuration admin support for location of Elasticsearch + appender = new ElasticsearchAppender("localhost", 9300); + appender.open(); + Dictionary<String, String> properties = new Hashtable<>(); properties.put("name", "elasticsearch"); - service = bundleContext.registerService(Appender.class, appender, (Dictionary) properties); + bundleContext.registerService(Appender.class, appender, properties); } public void stop(BundleContext bundleContext) { - if (service != null) { - service.unregister(); - } + appender.close();; } } http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/3f11fc38/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java ---------------------------------------------------------------------- diff --git a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java index c71cc4b..0d45983 100644 --- a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java +++ b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java @@ -16,14 +16,7 @@ */ package org.apache.karaf.decanter.appender.elasticsearch; -import org.apache.karaf.decanter.api.Appender; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import java.text.SimpleDateFormat; import java.util.Date; @@ -33,64 +26,98 @@ import java.util.Map.Entry; import javax.json.Json; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; -import javax.json.JsonValue; + +import org.apache.karaf.decanter.api.Appender; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Karaf Decanter appender which insert into Elasticsearch + * Karaf Decanter appender which inserts into Elasticsearch */ public class ElasticsearchAppender implements Appender { private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAppender.class); - + private final SimpleDateFormat tsFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); - private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + private final SimpleDateFormat indexDateFormat = new SimpleDateFormat("yyyy.MM.dd"); + Client client; - public void append(Map<Long, Map<String, Object>> data) throws Exception { - LOGGER.debug("Appending into Elasticsearch"); + private String host; + private int port; - Client client = null; + public ElasticsearchAppender(String host, int port) { + this.host = host; + this.port = port; + } + + @SuppressWarnings("resource") + public void open() { + try { + Settings settings = settingsBuilder().classLoader(Settings.class.getClassLoader()).build(); + InetSocketTransportAddress address = new InetSocketTransportAddress(host, port); + client = new TransportClient(settings).addTransportAddress(address); + LOGGER.info("Starting Elasticsearch appender writing to {}", address.address()); + } catch (Exception e) { + LOGGER.error("Error connecting to elastic search", e); + } + } + + public void close() { + LOGGER.info("Stopping Elasticsearch appender"); + client.close(); + } + + public void append(Map<Long, Map<String, Object>> data) throws Exception { try { - // TODO embed mode and configuration admin support for location of Elasticsearch - LOGGER.debug("Connecting to Elasticsearch instance located localhost:9300"); - Settings settings = ImmutableSettings.settingsBuilder().classLoader(Settings.class.getClassLoader()).build(); - client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("localhost", 9300)); - for(Entry<Long, Map<String, Object>> entry : data.entrySet()){ - Date date = new Date(entry.getKey()); - entry.getValue().put("@timestamp", tsFormat.format(date)); - String indexName = String.format("karaf_%s", dateFormat.format(date)); - - JsonObjectBuilder jsonObjectBuilder = Json.createObjectBuilder(); - for (Entry<String, Object> valueEntry : entry.getValue().entrySet()) { - if (valueEntry.getValue() instanceof String) { - jsonObjectBuilder.add(valueEntry.getKey(), (String) valueEntry.getValue()); - } else if (valueEntry.getValue() instanceof Map) { - Map<String, Object> value = (Map<String, Object>) valueEntry.getValue(); - JsonObjectBuilder innerBuilder = Json.createObjectBuilder(); - for (Entry<String, Object> innerEntrySet : value.entrySet()) { - Object object = innerEntrySet.getValue(); - if (object instanceof String) - innerBuilder.add(innerEntrySet.getKey(), (String) object); - else if (object instanceof Long) - innerBuilder.add(innerEntrySet.getKey(), (Long) object); - else if (object instanceof Integer) - innerBuilder.add(innerEntrySet.getKey(), (Integer) object); - else if (object instanceof Float) - innerBuilder.add(innerEntrySet.getKey(), (Float) object); - } - jsonObjectBuilder.add(valueEntry.getKey(), innerBuilder.build()); - } - } - JsonObject jsonObject = jsonObjectBuilder.build(); - client.prepareIndex(indexName, "karaf_event").setSource(jsonObject.toString()).execute().actionGet(); + for (Entry<Long, Map<String, Object>> entry : data.entrySet()) { + send(client, new Date(entry.getKey()), entry.getValue()); } - LOGGER.debug("Apppending done"); } catch (Exception e) { LOGGER.warn("Can't append into Elasticsearch", e); - } finally { - if (client != null) { - client.close(); + } + } + + private void send(Client client, Date date, Map<String, Object> props) { + props.put("@timestamp", tsFormat.format(date)); + JsonObjectBuilder jsonObjectBuilder = Json.createObjectBuilder(); + for (Entry<String, Object> valueEntry : props.entrySet()) { + Object value = valueEntry.getValue(); + if (value instanceof String) { + jsonObjectBuilder.add(valueEntry.getKey(), (String)value); + } else if (value instanceof Map) { + @SuppressWarnings("unchecked") + JsonObject jsonO = asJson(jsonObjectBuilder, (Map<String, Object>)value); + jsonObjectBuilder.add(valueEntry.getKey(), jsonO); } } + JsonObject jsonObject = jsonObjectBuilder.build(); + String indexName = getIndexName("karaf", date); + client.prepareIndex(indexName, "karaf_event").setSource(jsonObject.toString()).execute().actionGet(); + } + + private String getIndexName(String prefix, Date date) { + return prefix + "-" + indexDateFormat.format(date); + } + + private JsonObject asJson(JsonObjectBuilder jsonObjectBuilder, Map<String, Object> value) { + JsonObjectBuilder innerBuilder = Json.createObjectBuilder(); + for (Entry<String, Object> innerEntrySet : value.entrySet()) { + String key = innerEntrySet.getKey(); + Object object = innerEntrySet.getValue(); + if (object instanceof String) + innerBuilder.add(key, (String)object); + else if (object instanceof Long) + innerBuilder.add(key, (Long)object); + else if (object instanceof Integer) + innerBuilder.add(key, (Integer)object); + else if (object instanceof Float) + innerBuilder.add(key, (Float)object); + } + return innerBuilder.build(); } }
