Repository: karaf-decanter
Updated Branches:
  refs/heads/master 5810e3414 -> 3f11fc382


[KARAF-3606] Create elastic search client at bundle start


Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/3f11fc38
Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/3f11fc38
Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/3f11fc38

Branch: refs/heads/master
Commit: 3f11fc3828df4c2ccd8307f1e891c301ebea1237
Parents: 5810e34
Author: Christian Schneider <[email protected]>
Authored: Thu Mar 12 12:15:42 2015 +0100
Committer: Christian Schneider <[email protected]>
Committed: Thu Mar 12 12:15:42 2015 +0100

----------------------------------------------------------------------
 .../appender/elasticsearch/Activator.java       |  21 ++-
 .../elasticsearch/ElasticsearchAppender.java    | 129 +++++++++++--------
 2 files changed, 88 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/3f11fc38/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
----------------------------------------------------------------------
diff --git 
a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
 
b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
index 528ebf6..f43d137 100644
--- 
a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
+++ 
b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
@@ -16,29 +16,28 @@
  */
 package org.apache.karaf.decanter.appender.elasticsearch;
 
+import java.util.Dictionary;
+import java.util.Hashtable;
+
 import org.apache.karaf.decanter.api.Appender;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-
-import java.util.Dictionary;
-import java.util.Properties;
 
 public class Activator implements BundleActivator {
 
-    private ServiceRegistration service;
+    private ElasticsearchAppender appender;
 
     public void start(BundleContext bundleContext) {
-        Appender appender = new ElasticsearchAppender();
-        Properties properties = new Properties();
+        // TODO embed mode and configuration admin support for location of 
Elasticsearch
+        appender = new ElasticsearchAppender("localhost", 9300);
+        appender.open();
+        Dictionary<String, String> properties = new Hashtable<>();
         properties.put("name", "elasticsearch");
-        service = bundleContext.registerService(Appender.class, appender, 
(Dictionary) properties);
+        bundleContext.registerService(Appender.class, appender, properties);
     }
 
     public void stop(BundleContext bundleContext) {
-        if (service != null) {
-            service.unregister();
-        }
+        appender.close();;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/3f11fc38/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
----------------------------------------------------------------------
diff --git 
a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
 
b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
index c71cc4b..0d45983 100644
--- 
a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
+++ 
b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
@@ -16,14 +16,7 @@
  */
 package org.apache.karaf.decanter.appender.elasticsearch;
 
-import org.apache.karaf.decanter.api.Appender;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static 
org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -33,64 +26,98 @@ import java.util.Map.Entry;
 import javax.json.Json;
 import javax.json.JsonObject;
 import javax.json.JsonObjectBuilder;
-import javax.json.JsonValue;
+
+import org.apache.karaf.decanter.api.Appender;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Karaf Decanter appender which insert into Elasticsearch
+ * Karaf Decanter appender which inserts into Elasticsearch
  */
 public class ElasticsearchAppender implements Appender {
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchAppender.class);
-    
+
     private final SimpleDateFormat tsFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
-    private final SimpleDateFormat dateFormat = new 
SimpleDateFormat("yyyy-MM-dd");
+    private final SimpleDateFormat indexDateFormat = new 
SimpleDateFormat("yyyy.MM.dd");
+    Client client;
 
-    public void append(Map<Long, Map<String, Object>> data) throws Exception {
-        LOGGER.debug("Appending into Elasticsearch");
+    private String host;
+    private int port;
 
-        Client client = null;
+    public ElasticsearchAppender(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    @SuppressWarnings("resource")
+    public void open() {
+        try {
+            Settings settings = 
settingsBuilder().classLoader(Settings.class.getClassLoader()).build();
+            InetSocketTransportAddress address = new 
InetSocketTransportAddress(host, port);
+            client = new 
TransportClient(settings).addTransportAddress(address);
+            LOGGER.info("Starting Elasticsearch appender writing to {}", 
address.address());
+        } catch (Exception e) {
+            LOGGER.error("Error connecting to elastic search", e);
+        }
+    }
+
+    public void close() {
+        LOGGER.info("Stopping Elasticsearch appender");
+        client.close();
+    }
+
+    public void append(Map<Long, Map<String, Object>> data) throws Exception {
         try {
-            // TODO embed mode and configuration admin support for location of 
Elasticsearch
-            LOGGER.debug("Connecting to Elasticsearch instance located 
localhost:9300");
-            Settings settings = 
ImmutableSettings.settingsBuilder().classLoader(Settings.class.getClassLoader()).build();
-            client = new TransportClient(settings).addTransportAddress(new 
InetSocketTransportAddress("localhost", 9300));
-            for(Entry<Long, Map<String, Object>> entry : data.entrySet()){
-               Date date = new Date(entry.getKey());
-               entry.getValue().put("@timestamp", tsFormat.format(date));
-               String indexName = String.format("karaf_%s", 
dateFormat.format(date));
-               
-               JsonObjectBuilder jsonObjectBuilder = 
Json.createObjectBuilder();
-               for (Entry<String, Object> valueEntry : 
entry.getValue().entrySet()) {
-                                       if (valueEntry.getValue() instanceof 
String) {
-                                               
jsonObjectBuilder.add(valueEntry.getKey(), (String) valueEntry.getValue());
-                                       } else if (valueEntry.getValue() 
instanceof Map) {
-                                               Map<String, Object> value = 
(Map<String, Object>) valueEntry.getValue();
-                                               JsonObjectBuilder innerBuilder 
= Json.createObjectBuilder();
-                                               for (Entry<String, Object> 
innerEntrySet : value.entrySet()) {
-                                                       Object object = 
innerEntrySet.getValue();
-                                                       if (object instanceof 
String)
-                                                               
innerBuilder.add(innerEntrySet.getKey(), (String) object);
-                                                       else if (object 
instanceof Long)
-                                                               
innerBuilder.add(innerEntrySet.getKey(), (Long) object);
-                                                       else if (object 
instanceof Integer)
-                                                               
innerBuilder.add(innerEntrySet.getKey(), (Integer) object);
-                                                       else if (object 
instanceof Float)
-                                                               
innerBuilder.add(innerEntrySet.getKey(), (Float) object);
-                                               }
-                                               
jsonObjectBuilder.add(valueEntry.getKey(), innerBuilder.build());
-                                       }
-                               }
-               JsonObject jsonObject = jsonObjectBuilder.build();
-               client.prepareIndex(indexName, 
"karaf_event").setSource(jsonObject.toString()).execute().actionGet();
+            for (Entry<Long, Map<String, Object>> entry : data.entrySet()) {
+                send(client, new Date(entry.getKey()), entry.getValue());
             }
-            LOGGER.debug("Apppending done");
         } catch (Exception e) {
             LOGGER.warn("Can't append into Elasticsearch", e);
-        } finally {
-            if (client != null) {
-                client.close();
+        }
+    }
+
+    private void send(Client client, Date date, Map<String, Object> props) {
+        props.put("@timestamp", tsFormat.format(date));
+        JsonObjectBuilder jsonObjectBuilder = Json.createObjectBuilder();
+        for (Entry<String, Object> valueEntry : props.entrySet()) {
+            Object value = valueEntry.getValue();
+            if (value instanceof String) {
+                jsonObjectBuilder.add(valueEntry.getKey(), (String)value);
+            } else if (value instanceof Map) {
+                @SuppressWarnings("unchecked")
+                JsonObject jsonO = asJson(jsonObjectBuilder, (Map<String, 
Object>)value);
+                jsonObjectBuilder.add(valueEntry.getKey(), jsonO);
             }
         }
+        JsonObject jsonObject = jsonObjectBuilder.build();
+        String indexName = getIndexName("karaf", date);
+        client.prepareIndex(indexName, 
"karaf_event").setSource(jsonObject.toString()).execute().actionGet();
+    }
+
+    private String getIndexName(String prefix, Date date) {
+        return prefix + "-" + indexDateFormat.format(date);
+    }
+
+    private JsonObject asJson(JsonObjectBuilder jsonObjectBuilder, Map<String, 
Object> value) {
+        JsonObjectBuilder innerBuilder = Json.createObjectBuilder();
+        for (Entry<String, Object> innerEntrySet : value.entrySet()) {
+            String key = innerEntrySet.getKey();
+            Object object = innerEntrySet.getValue();
+            if (object instanceof String)
+                innerBuilder.add(key, (String)object);
+            else if (object instanceof Long)
+                innerBuilder.add(key, (Long)object);
+            else if (object instanceof Integer)
+                innerBuilder.add(key, (Integer)object);
+            else if (object instanceof Float)
+                innerBuilder.add(key, (Float)object);
+        }
+        return innerBuilder.build();
     }
 
 }

Reply via email to