Repository: incubator-zeppelin Updated Branches: refs/heads/master e4f6f9cd3 -> 0849ea97a
Display aggregation result with Elasticsearch interpreter ### What is this PR for? Take into account the aggregations with Elastisearch search query. This improvement of the interpreter will return the result of the aggregation part if any in the result of search query. ### What type of PR is it? Improvement ### Todos * [X ] - Modify interpreter to read and return the aggregation result if any * [X ] - Update docs ### Is there a relevant Jira issue? No ### How should this be tested? Test with an Elasticsearch and with examples provided in the doc. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? elasticsearch.md Author: Bruno Bonnin <[email protected]> Closes #596 from bbonnin/master and squashes the following commits: aa809e2 [Bruno Bonnin] Update elasticsearch.md 6793232 [Bruno Bonnin] Update elasticsearch.md 17f4e58 [Bruno Bonnin] Merge branch 'master' of https://github.com/bbonnin/incubator-zeppelin 6f76ffd [Bruno Bonnin] Support of aggregation results Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/0849ea97 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/0849ea97 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/0849ea97 Branch: refs/heads/master Commit: 0849ea97ac4df7d7e906cdab824cd78c55f59dd1 Parents: e4f6f9c Author: Bruno Bonnin <[email protected]> Authored: Mon Dec 28 15:47:42 2015 +0100 Committer: Alexander Bezzubov <[email protected]> Committed: Tue Jan 5 17:44:23 2016 +0900 ---------------------------------------------------------------------- .../elasticsearch-agg-multi-bucket-pie.png | Bin 0 -> 96634 bytes .../elasticsearch-agg-multi-value-metric.png | Bin 0 -> 121436 bytes .../docs-img/elasticsearch-count-with-query.png | Bin .../img/docs-img/elasticsearch-query-string.png | Bin .../elasticsearch-search-json-query-table.png | Bin docs/interpreter/elasticsearch.md | 28 +++++- .../elasticsearch/ElasticsearchInterpreter.java | 86 ++++++++++++++----- .../ElasticsearchInterpreterTest.java | 58 ++++++++++--- 8 files changed, 132 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0849ea97/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-agg-multi-bucket-pie.png ---------------------------------------------------------------------- diff --git a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-agg-multi-bucket-pie.png b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-agg-multi-bucket-pie.png new file mode 100644 index 0000000..291e6ab Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-agg-multi-bucket-pie.png differ http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0849ea97/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-agg-multi-value-metric.png ---------------------------------------------------------------------- diff --git a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-agg-multi-value-metric.png b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-agg-multi-value-metric.png new file mode 100644 index 0000000..311c0d2 Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-agg-multi-value-metric.png differ http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0849ea97/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-count-with-query.png ---------------------------------------------------------------------- diff --git a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-count-with-query.png b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-count-with-query.png old mode 100755 new mode 100644 http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0849ea97/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-query-string.png ---------------------------------------------------------------------- diff --git a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-query-string.png b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-query-string.png old mode 100755 new mode 100644 http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0849ea97/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-search-json-query-table.png ---------------------------------------------------------------------- diff --git a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-search-json-query-table.png b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-search-json-query-table.png old mode 100755 new mode 100644 http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0849ea97/docs/interpreter/elasticsearch.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/elasticsearch.md b/docs/interpreter/elasticsearch.md index 4a641c5..34a23ba 100644 --- a/docs/interpreter/elasticsearch.md +++ b/docs/interpreter/elasticsearch.md @@ -110,6 +110,7 @@ With the `search` command, you can send a search query to Elasticsearch. There a * This is a shortcut to a query like that: `{ "query": { "query_string": { "query": "__HERE YOUR QUERY__", "analyze_wildcard": true } } }` * See [Elasticsearch query string syntax](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#query-string-syntax) for more details about the content of such a query. + ```bash | %elasticsearch | search /index1,index2,.../type1,type2,... <JSON document containing the query or query_string elements> @@ -124,6 +125,9 @@ If you want to modify the size of the result set, you can add a line that is set ``` +> A search query can also contain [aggregations](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html). If there is at least one aggregation, the result of the first aggregation is shown, otherwise, you get the search hits. + + Examples: * With a JSON query: @@ -134,6 +138,15 @@ Examples: | | %elasticsearch | search /logs { "query": { "query_string": { "query": "request.method:GET AND status:200" } } } +| +| %elasticsearch +| search /logs { "aggs": { +| "content_length_stats": { +| "extended_stats": { +| "field": "content_length" +| } +| } +| } } ``` * With query_string elements: @@ -159,16 +172,17 @@ Suppose we have a JSON document: "url": "/zeppelin/4cd001cd-c517-4fa9-b8e5-a06b8f4056c4", "headers": [ "Accept: *.*", "Host: apache.org"] }, - "status": "403" + "status": "403", + "content_length": 1234 } ``` The data will be flattened like this: -date | request.headers[0] | request.headers[1] | request.method | request.url | status ------|--------------------|--------------------|----------------|-------------|------- -2015-12-08T21:03:13.588Z | Accept: \*.\* | Host: apache.org | GET | /zeppelin/4cd001cd-c517-4fa9-b8e5-a06b8f4056c4 | 403 +content_length | date | request.headers[0] | request.headers[1] | request.method | request.url | status +---------------|------|--------------------|--------------------|----------------|-------------|------- +1234 | 2015-12-08T21:03:13.588Z | Accept: \*.\* | Host: apache.org | GET | /zeppelin/4cd001cd-c517-4fa9-b8e5-a06b8f4056c4 | 403 Examples: @@ -185,6 +199,12 @@ Examples: * With a query string:  +* With a query containing a multi-value metric aggregation: + + +* With a query containing a multi-bucket aggregation: + + #### count With the `count` command, you can count documents available in some indices and types. You can also provide a query. http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0849ea97/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java index dba5b73..ac94abf 100644 --- a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java @@ -17,17 +17,10 @@ package org.apache.zeppelin.elasticsearch; -import java.io.IOException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.TreeSet; - +import com.github.wnameless.json.flattener.JsonFlattener; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParseException; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -43,16 +36,21 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.wnameless.json.flattener.JsonFlattener; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonParseException; +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; /** @@ -313,7 +311,8 @@ public class ElasticsearchInterpreter extends Interpreter { * Processes a "search" request. * * @param urlItems Items of the URL - * @param data May contains the limit and the JSON of the request + * @param data May contains the JSON of the request + * @param size Limit of result set * @return Result of the search request, it contains a tab-formatted string of the matching hits */ private InterpreterResult processSearch(String[] urlItems, String data, int size) { @@ -325,10 +324,7 @@ public class ElasticsearchInterpreter extends Interpreter { final SearchResponse response = searchData(urlItems, data, size); - return new InterpreterResult( - InterpreterResult.Code.SUCCESS, - InterpreterResult.Type.TABLE, - buildResponseMessage(response.getHits().getHits())); + return buildResponseMessage(response); } /** @@ -419,7 +415,39 @@ public class ElasticsearchInterpreter extends Interpreter { return response; } - private String buildResponseMessage(SearchHit[] hits) { + private InterpreterResult buildAggResponseMessage(Aggregations aggregations) { + + // Only the result of the first aggregation is returned + // + final Aggregation agg = aggregations.asList().get(0); + InterpreterResult.Type resType = InterpreterResult.Type.TEXT; + String resMsg = ""; + + if (agg instanceof InternalMetricsAggregation) { + resMsg = XContentHelper.toString((InternalMetricsAggregation) agg).toString(); + } + else if (agg instanceof InternalSingleBucketAggregation) { + resMsg = XContentHelper.toString((InternalSingleBucketAggregation) agg).toString(); + } + else if (agg instanceof InternalMultiBucketAggregation) { + final StringBuffer buffer = new StringBuffer("key\tdoc_count"); + + final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg; + for (MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) { + buffer.append("\n") + .append(bucket.getKeyAsString()) + .append("\t") + .append(bucket.getDocCount()); + } + + resType = InterpreterResult.Type.TABLE; + resMsg = buffer.toString(); + } + + return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg); + } + + private String buildSearchHitsResponseMessage(SearchHit[] hits) { if (hits == null || hits.length == 0) { return ""; @@ -462,4 +490,18 @@ public class ElasticsearchInterpreter extends Interpreter { return buffer.toString(); } + + private InterpreterResult buildResponseMessage(SearchResponse response) { + + final Aggregations aggregations = response.getAggregations(); + + if (aggregations != null && aggregations.asList().size() > 0) { + return buildAggResponseMessage(aggregations); + } + + return new InterpreterResult( + InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.TABLE, + buildSearchHitsResponseMessage(response.getHits().getHits())); + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0849ea97/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java index 839be66..248258f 100644 --- a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java +++ b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java @@ -17,31 +17,27 @@ package org.apache.zeppelin.elasticsearch; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.util.Arrays; -import java.util.Date; -import java.util.Properties; -import java.util.UUID; - import org.apache.commons.lang.math.RandomUtils; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import java.io.IOException; +import java.util.Arrays; +import java.util.Date; +import java.util.Properties; +import java.util.UUID; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.assertEquals; + public class ElasticsearchInterpreterTest { private static Client elsClient; @@ -49,7 +45,7 @@ public class ElasticsearchInterpreterTest { private static ElasticsearchInterpreter interpreter; private static final String[] METHODS = { "GET", "PUT", "DELETE", "POST" }; - private static final String[] STATUS = { "200", "404", "500", "403" }; + private static final int[] STATUS = { 200, 404, 500, 403 }; private static final String ELS_CLUSTER_NAME = "zeppelin-elasticsearch-interpreter-test"; private static final String ELS_HOST = "localhost"; @@ -71,6 +67,14 @@ public class ElasticsearchInterpreterTest { elsNode = NodeBuilder.nodeBuilder().settings(settings).node(); elsClient = elsNode.client(); + + elsClient.admin().indices().prepareCreate("logs") + .addMapping("http", jsonBuilder() + .startObject().startObject("http").startObject("properties") + .startObject("content_length") + .field("type", "integer") + .endObject() + .endObject().endObject().endObject()).get(); for (int i = 0; i < 50; i++) { elsClient.prepareIndex("logs", "http", "" + i) @@ -84,6 +88,7 @@ public class ElasticsearchInterpreterTest { .field("headers", Arrays.asList("Accept: *.*", "Host: apache.org")) .endObject() .field("status", STATUS[RandomUtils.nextInt(STATUS.length)]) + .field("content_length", RandomUtils.nextInt(2000)) ) .get(); } @@ -147,6 +152,31 @@ public class ElasticsearchInterpreterTest { res = interpreter.interpret("search /logs status:404", null); assertEquals(Code.SUCCESS, res.code()); } + + @Test + public void testAgg() { + + // Single-value metric + InterpreterResult res = interpreter.interpret("search /logs { \"aggs\" : { \"distinct_status_count\" : " + + " { \"cardinality\" : { \"field\" : \"status\" } } } }", null); + assertEquals(Code.SUCCESS, res.code()); + + // Multi-value metric + res = interpreter.interpret("search /logs { \"aggs\" : { \"content_length_stats\" : " + + " { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", null); + assertEquals(Code.SUCCESS, res.code()); + + // Single bucket + res = interpreter.interpret("search /logs { \"aggs\" : { " + + " \"200_OK\" : { \"filter\" : { \"term\": { \"status\": \"200\" } }, " + + " \"aggs\" : { \"avg_length\" : { \"avg\" : { \"field\" : \"content_length\" } } } } } }", null); + assertEquals(Code.SUCCESS, res.code()); + + // Multi-buckets + res = interpreter.interpret("search /logs { \"aggs\" : { \"status_count\" : " + + " { \"terms\" : { \"field\" : \"status\" } } } }", null); + assertEquals(Code.SUCCESS, res.code()); + } @Test public void testIndex() {
