Bulk indexing for elastic search appender, some test cases added, minor fixes for embedded node, switch from JSON Glassfish RI to Apache Johnzon
Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/c305a7ac Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/c305a7ac Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/c305a7ac Branch: refs/heads/master Commit: c305a7acc76aae989b2e2abe35e4738ed4518a5c Parents: 4152a3e Author: Hendrik Saly <[email protected]> Authored: Mon Mar 30 16:39:05 2015 +0200 Committer: Hendrik Saly <[email protected]> Committed: Mon Mar 30 16:39:05 2015 +0200 ---------------------------------------------------------------------- appender/elasticsearch/pom.xml | 21 +++++- .../elasticsearch/ElasticsearchAppender.java | 65 +++++++++++++++--- .../TestElasticsearchAppender.java | 72 ++++++++++++++++++++ elasticsearch/pom.xml | 12 ++++ .../decanter/elasticsearch/EmbeddedNode.java | 4 +- .../elasticsearch/TestEmbeddedNode.java | 43 ++++++++++++ 6 files changed, 206 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/appender/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/appender/elasticsearch/pom.xml b/appender/elasticsearch/pom.xml index d3037fb..ca284a9 100644 --- a/appender/elasticsearch/pom.xml +++ b/appender/elasticsearch/pom.xml @@ -56,7 +56,26 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> - + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>1.7.7</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.johnzon</groupId> + <artifactId>johnzon-mapper</artifactId> + <version>0.7-incubating</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java ---------------------------------------------------------------------- diff --git a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java index e9050cd..c4749f2 100644 --- a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java +++ b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java @@ -25,15 +25,23 @@ import java.util.Date; import java.util.Map; import java.util.Map.Entry; import java.util.TimeZone; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; import javax.json.Json; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.osgi.service.event.Event; import org.osgi.service.event.EventHandler; import org.slf4j.Logger; @@ -45,9 +53,12 @@ import org.slf4j.LoggerFactory; public class ElasticsearchAppender implements EventHandler { private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAppender.class); - + private final SimpleDateFormat tsFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); private final SimpleDateFormat indexDateFormat = new SimpleDateFormat("yyyy.MM.dd"); + private final AtomicLong pendingBulkItemCount = new AtomicLong(); + private final int concurrentRequests = 1; + private BulkProcessor bulkProcessor; Client client; private String host; @@ -67,6 +78,29 @@ public class ElasticsearchAppender implements EventHandler { Settings settings = settingsBuilder().classLoader(Settings.class.getClassLoader()).build(); InetSocketTransportAddress address = new InetSocketTransportAddress(host, port); client = new TransportClient(settings).addTransportAddress(address); + bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { + + @Override + public void beforeBulk(long executionId, BulkRequest request) { + pendingBulkItemCount.addAndGet(request.numberOfActions()); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + LOGGER.warn("Can't append into Elasticsearch", failure); + pendingBulkItemCount.addAndGet(-request.numberOfActions()); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + pendingBulkItemCount.addAndGet(-response.getItems().length); + } + }) + .setBulkActions(1000) + .setConcurrentRequests(concurrentRequests) + .setBulkSize(ByteSizeValue.parseBytesSizeValue("5mb")) + .setFlushInterval(TimeValue.timeValueSeconds(5)) + .build(); LOGGER.info("Starting Elasticsearch appender writing to {}", address.address()); } catch (Exception e) { LOGGER.error("Error connecting to elastic search", e); @@ -75,22 +109,34 @@ public class ElasticsearchAppender implements EventHandler { public void close() { LOGGER.info("Stopping Elasticsearch appender"); - client.close(); + + if(bulkProcessor != null) { + bulkProcessor.close(); + } + + //if ConcurrentRequests > 0 we ll wait here because close() triggers a flush which is async + while(concurrentRequests > 0 && pendingBulkItemCount.get() > 0) { + LockSupport.parkNanos(1000*50); + } + + if(client != null) { + client.close(); + } } @Override public void handleEvent(Event event) { try { - send(client, event); + send(event); } catch (Exception e) { LOGGER.warn("Can't append into Elasticsearch", e); } } @SuppressWarnings("unchecked") - private void send(Client client, Event event) { + private void send(Event event) { Long ts = (Long)event.getProperty("timestamp"); - Date date = ts != null ? new Date((Long)ts) : new Date(); + Date date = ts != null ? new Date(ts) : new Date(); JsonObjectBuilder jsonObjectBuilder = Json.createObjectBuilder(); jsonObjectBuilder.add("@timestamp", tsFormat.format(date)); @@ -105,9 +151,12 @@ public class ElasticsearchAppender implements EventHandler { JsonObject jsonObject = jsonObjectBuilder.build(); String indexName = getIndexName("karaf", date); String jsonSt = jsonObject.toString(); - LOGGER.debug("Sending event to elastic search with content: {}", jsonSt); - - client.prepareIndex(indexName, getType(event)).setSource(jsonSt).execute().actionGet(); + + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("Sending event to elastic search with content: {}", jsonSt); + } + + bulkProcessor.add(new IndexRequest(indexName, getType(event)).source(jsonSt)); } private String getType(Event event) { http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/appender/elasticsearch/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java ---------------------------------------------------------------------- diff --git a/appender/elasticsearch/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java b/appender/elasticsearch/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java new file mode 100644 index 0000000..f2cce5c --- /dev/null +++ b/appender/elasticsearch/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.decanter.appender.elasticsearch; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; + +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.Node; +import org.junit.Assert; +import org.junit.Test; +import org.osgi.service.event.Event; + +import static org.elasticsearch.node.NodeBuilder.*; + + +public class TestElasticsearchAppender { + + @Test + public void testAppender() throws Exception { + + Settings settings = settingsBuilder() + .put("cluster.name", "elasticsearch") + .put("http.enabled", "true") + .put("node.data", true) + .put("path.data", "target/data") + .put("discovery.type", "zen") + .put("discovery.zen.multicast.enabled", false) + .put("discovery.zen.ping.unicast.enabled", true) + .put("discovery.zen.unicast.hosts", "127.0.0.1") + .put("network.host", "127.0.0.1") + .put("index.store.type", "memory") + .put("index.store.fs.memory.enabled", "true") + .put("gateway.type", "none") + .put("path.plugins", "target/plugins") + .build(); + + Node node = nodeBuilder().settings(settings).node(); + + ElasticsearchAppender appender = new ElasticsearchAppender("127.0.0.1", 9300); + appender.open(); + appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map())); + appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map())); + appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map())); + appender.close(); + + int maxTryCount = 10; + for(int i=0; node.client().count(Requests.countRequest()).actionGet().getCount() == 0 && i< maxTryCount; i++) { + Thread.sleep(500); + } + + Assert.assertEquals(3L, node.client().count(Requests.countRequest()).actionGet().getCount()); + node.close(); + } + +} http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml index c862e41..2b7a281 100644 --- a/elasticsearch/pom.xml +++ b/elasticsearch/pom.xml @@ -48,12 +48,24 @@ <artifactId>slf4j-api</artifactId> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>1.7.7</version> + <scope>test</scope> + </dependency> + <!-- OSGi --> <dependency> <groupId>org.osgi</groupId> <artifactId>org.osgi.core</artifactId> </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/elasticsearch/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java b/elasticsearch/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java index 6eb1be9..82041af 100644 --- a/elasticsearch/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java +++ b/elasticsearch/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java @@ -54,7 +54,7 @@ public class EmbeddedNode { .put("discovery.zen.ping.unicast.enabled", true) .put("discovery.zen.unicast.hosts", "127.0.0.1") .put("network.host", "127.0.0.1") - .put("gateway.type", "none") + .put("gateway.type", "local") .put("cluster.routing.schedule", "50ms") .put("path.plugins", pluginsFile.getAbsolutePath()) .build(); @@ -65,7 +65,7 @@ public class EmbeddedNode { builder.classLoader(Settings.class.getClassLoader()); node = new InternalNode(builder.build(), false); - LOGGER.info("Elasticsearch node started"); + LOGGER.info("Elasticsearch node created"); } public void start() throws Exception { http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/c305a7ac/elasticsearch/src/test/java/org/apache/karaf/decanter/elasticsearch/TestEmbeddedNode.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/test/java/org/apache/karaf/decanter/elasticsearch/TestEmbeddedNode.java b/elasticsearch/src/test/java/org/apache/karaf/decanter/elasticsearch/TestEmbeddedNode.java new file mode 100644 index 0000000..f97721e --- /dev/null +++ b/elasticsearch/src/test/java/org/apache/karaf/decanter/elasticsearch/TestEmbeddedNode.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.decanter.elasticsearch; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.client.Requests; +import org.elasticsearch.node.Node; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class TestEmbeddedNode { + + @Test + public void testNode() throws Exception { + + EmbeddedNode embeddedNode = new EmbeddedNode(); + Node node = embeddedNode.getNode(); + embeddedNode.start(); + ClusterHealthResponse healthResponse = node.client().admin().cluster().health(Requests.clusterHealthRequest()).actionGet(); + assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus()); + embeddedNode.stop(); + + } + + +}
