Bulk indexing for elastic search appender, some test cases added, minor fixes 
for embedded node, switch from JSON Glassfish RI to Apache Johnzon


Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/c305a7ac
Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/c305a7ac
Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/c305a7ac

Branch: refs/heads/master
Commit: c305a7acc76aae989b2e2abe35e4738ed4518a5c
Parents: 4152a3e
Author: Hendrik Saly <[email protected]>
Authored: Mon Mar 30 16:39:05 2015 +0200
Committer: Hendrik Saly <[email protected]>
Committed: Mon Mar 30 16:39:05 2015 +0200

----------------------------------------------------------------------
 appender/elasticsearch/pom.xml                  | 21 +++++-
 .../elasticsearch/ElasticsearchAppender.java    | 65 +++++++++++++++---
 .../TestElasticsearchAppender.java              | 72 ++++++++++++++++++++
 elasticsearch/pom.xml                           | 12 ++++
 .../decanter/elasticsearch/EmbeddedNode.java    |  4 +-
 .../elasticsearch/TestEmbeddedNode.java         | 43 ++++++++++++
 6 files changed, 206 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/appender/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/appender/elasticsearch/pom.xml b/appender/elasticsearch/pom.xml
index d3037fb..ca284a9 100644
--- a/appender/elasticsearch/pom.xml
+++ b/appender/elasticsearch/pom.xml
@@ -56,7 +56,26 @@
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-api</artifactId>
                </dependency>
-
+        
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>1.7.7</version>
+            <scope>test</scope>
+        </dependency>
+        
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        
+         <dependency>
+            <groupId>org.apache.johnzon</groupId>
+            <artifactId>johnzon-mapper</artifactId>
+            <version>0.7-incubating</version>
+            <scope>test</scope>
+        </dependency>
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
----------------------------------------------------------------------
diff --git 
a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
 
b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
index e9050cd..c4749f2 100644
--- 
a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
+++ 
b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
@@ -25,15 +25,23 @@ import java.util.Date;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
 
 import javax.json.Json;
 import javax.json.JsonObject;
 import javax.json.JsonObjectBuilder;
 
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventHandler;
 import org.slf4j.Logger;
@@ -45,9 +53,12 @@ import org.slf4j.LoggerFactory;
 public class ElasticsearchAppender implements EventHandler {
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchAppender.class);
-
+    
     private final SimpleDateFormat tsFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
     private final SimpleDateFormat indexDateFormat = new 
SimpleDateFormat("yyyy.MM.dd");
+    private final AtomicLong pendingBulkItemCount = new AtomicLong();
+    private final int concurrentRequests = 1;
+    private BulkProcessor bulkProcessor;
     Client client;
 
     private String host;
@@ -67,6 +78,29 @@ public class ElasticsearchAppender implements EventHandler {
             Settings settings = 
settingsBuilder().classLoader(Settings.class.getClassLoader()).build();
             InetSocketTransportAddress address = new 
InetSocketTransportAddress(host, port);
             client = new 
TransportClient(settings).addTransportAddress(address);
+            bulkProcessor = BulkProcessor.builder(client, new 
BulkProcessor.Listener() {
+                
+                @Override
+                public void beforeBulk(long executionId, BulkRequest request) {
+                    pendingBulkItemCount.addAndGet(request.numberOfActions());
+                }
+                
+                @Override
+                public void afterBulk(long executionId, BulkRequest request, 
Throwable failure) {
+                    LOGGER.warn("Can't append into Elasticsearch", failure);
+                    pendingBulkItemCount.addAndGet(-request.numberOfActions());
+                }
+                
+                @Override
+                public void afterBulk(long executionId, BulkRequest request, 
BulkResponse response) {
+                    
pendingBulkItemCount.addAndGet(-response.getItems().length);
+                }
+            })
+            .setBulkActions(1000)
+            .setConcurrentRequests(concurrentRequests)
+            .setBulkSize(ByteSizeValue.parseBytesSizeValue("5mb"))
+            .setFlushInterval(TimeValue.timeValueSeconds(5))
+            .build();
             LOGGER.info("Starting Elasticsearch appender writing to {}", 
address.address());
         } catch (Exception e) {
             LOGGER.error("Error connecting to elastic search", e);
@@ -75,22 +109,34 @@ public class ElasticsearchAppender implements EventHandler 
{
 
     public void close() {
         LOGGER.info("Stopping Elasticsearch appender");
-        client.close();
+        
+        if(bulkProcessor != null) {
+            bulkProcessor.close();
+        }
+
+        //if ConcurrentRequests > 0 we ll wait here because close() triggers a 
flush which is async
+        while(concurrentRequests > 0 && pendingBulkItemCount.get() > 0) {
+            LockSupport.parkNanos(1000*50);
+        }
+        
+        if(client != null) {
+            client.close();
+        }
     }
 
     @Override
     public void handleEvent(Event event) {
         try {
-            send(client, event);
+            send(event);
         } catch (Exception e) {
             LOGGER.warn("Can't append into Elasticsearch", e);
         }
     }
 
     @SuppressWarnings("unchecked")
-    private void send(Client client, Event event) {
+    private void send(Event event) {
         Long ts = (Long)event.getProperty("timestamp");
-        Date date = ts != null ? new Date((Long)ts) : new Date();
+        Date date = ts != null ? new Date(ts) : new Date();
         
         JsonObjectBuilder jsonObjectBuilder = Json.createObjectBuilder();
         jsonObjectBuilder.add("@timestamp", tsFormat.format(date));
@@ -105,9 +151,12 @@ public class ElasticsearchAppender implements EventHandler 
{
         JsonObject jsonObject = jsonObjectBuilder.build();
         String indexName = getIndexName("karaf", date);
         String jsonSt = jsonObject.toString();
-        LOGGER.debug("Sending event to elastic search with content: {}", 
jsonSt);
- 
-        client.prepareIndex(indexName, 
getType(event)).setSource(jsonSt).execute().actionGet();
+        
+        if(LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Sending event to elastic search with content: {}", 
jsonSt);
+        }
+        
+        bulkProcessor.add(new IndexRequest(indexName, 
getType(event)).source(jsonSt));
     }
 
     private String getType(Event event) {

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/appender/elasticsearch/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
----------------------------------------------------------------------
diff --git 
a/appender/elasticsearch/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
 
b/appender/elasticsearch/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
new file mode 100644
index 0000000..f2cce5c
--- /dev/null
+++ 
b/appender/elasticsearch/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.elasticsearch;
+
+import static 
org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.collect.MapBuilder;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+
+import static org.elasticsearch.node.NodeBuilder.*;
+
+
+public class TestElasticsearchAppender {
+
+   @Test
+   public void testAppender() throws Exception {
+       
+       Settings settings = settingsBuilder()
+               .put("cluster.name", "elasticsearch")
+               .put("http.enabled", "true")
+               .put("node.data", true)
+               .put("path.data", "target/data")
+               .put("discovery.type", "zen")
+               .put("discovery.zen.multicast.enabled", false)
+               .put("discovery.zen.ping.unicast.enabled", true)
+               .put("discovery.zen.unicast.hosts", "127.0.0.1")
+               .put("network.host", "127.0.0.1")
+               .put("index.store.type", "memory")
+               .put("index.store.fs.memory.enabled", "true")
+               .put("gateway.type", "none")
+               .put("path.plugins", "target/plugins")
+               .build();
+       
+       Node node = nodeBuilder().settings(settings).node();
+       
+       ElasticsearchAppender appender = new ElasticsearchAppender("127.0.0.1", 
9300);
+       appender.open();
+       appender.handleEvent(new Event("testTopic", MapBuilder.<String, 
String>newMapBuilder().put("a", "b").put("c", "d").map()));
+       appender.handleEvent(new Event("testTopic", MapBuilder.<String, 
String>newMapBuilder().put("a", "b").put("c", "d").map()));
+       appender.handleEvent(new Event("testTopic", MapBuilder.<String, 
String>newMapBuilder().put("a", "b").put("c", "d").map()));
+       appender.close();
+       
+       int maxTryCount = 10;
+       for(int i=0; 
node.client().count(Requests.countRequest()).actionGet().getCount() == 0 && i< 
maxTryCount; i++) {
+           Thread.sleep(500);
+       }
+       
+       Assert.assertEquals(3L, 
node.client().count(Requests.countRequest()).actionGet().getCount());
+       node.close();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index c862e41..2b7a281 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -48,12 +48,24 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>1.7.7</version>
+            <scope>test</scope>
+        </dependency>
+
         <!-- OSGi -->
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.core</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/elasticsearch/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
 
b/elasticsearch/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
index 6eb1be9..82041af 100644
--- 
a/elasticsearch/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
+++ 
b/elasticsearch/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
@@ -54,7 +54,7 @@ public class EmbeddedNode {
                 .put("discovery.zen.ping.unicast.enabled", true)
                 .put("discovery.zen.unicast.hosts", "127.0.0.1")
                 .put("network.host", "127.0.0.1")
-                .put("gateway.type", "none")
+                .put("gateway.type", "local")
                 .put("cluster.routing.schedule", "50ms")
                 .put("path.plugins", pluginsFile.getAbsolutePath())
                 .build();
@@ -65,7 +65,7 @@ public class EmbeddedNode {
         builder.classLoader(Settings.class.getClassLoader());
         node = new InternalNode(builder.build(), false);
 
-        LOGGER.info("Elasticsearch node started");
+        LOGGER.info("Elasticsearch node created");
     }
 
     public void start() throws Exception {

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/elasticsearch/src/test/java/org/apache/karaf/decanter/elasticsearch/TestEmbeddedNode.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/test/java/org/apache/karaf/decanter/elasticsearch/TestEmbeddedNode.java
 
b/elasticsearch/src/test/java/org/apache/karaf/decanter/elasticsearch/TestEmbeddedNode.java
new file mode 100644
index 0000000..f97721e
--- /dev/null
+++ 
b/elasticsearch/src/test/java/org/apache/karaf/decanter/elasticsearch/TestEmbeddedNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.elasticsearch;
+
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.node.Node;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class TestEmbeddedNode {
+
+    @Test
+    public void testNode() throws Exception {
+   
+        EmbeddedNode embeddedNode = new EmbeddedNode();
+        Node node = embeddedNode.getNode();
+        embeddedNode.start();
+        ClusterHealthResponse healthResponse = 
node.client().admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
+        assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
+        embeddedNode.stop();
+        
+    }
+    
+
+}

Reply via email to