Repository: incubator-streams Updated Branches: refs/heads/master 6dd1ea51c -> 2fc9d0e2d
STREAMS-194 | Added constructor to allow for the overriding of the default Elasticsearch percolate field Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b386a52e Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b386a52e Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b386a52e Branch: refs/heads/master Commit: b386a52e248b45436b017c0ac2d63361cb35195a Parents: a7a4012 Author: Robert Douglas <[email protected]> Authored: Mon Oct 13 11:09:01 2014 -0500 Committer: Robert Douglas <[email protected]> Committed: Mon Oct 13 11:09:01 2014 -0500 ---------------------------------------------------------------------- .../processor/PercolateTagProcessor.java | 64 ++++---------------- .../processor/PercolateTagProcessorTest.java | 46 ++++++++++++++ 2 files changed, 57 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b386a52e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java index 389f390..0b62ce9 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java @@ -39,8 +39,6 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsReques import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.percolate.PercolateRequestBuilder; import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.action.search.SearchRequestBuilder; @@ -68,8 +66,8 @@ import java.util.*; public class PercolateTagProcessor implements StreamsProcessor { public static final String STREAMS_ID = "PercolateTagProcessor"; - private final static Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class); + private final static String DEFAULT_PERCOLATE_FIELD = "_all"; private ObjectMapper mapper; @@ -81,9 +79,15 @@ public class PercolateTagProcessor implements StreamsProcessor { private ElasticsearchWriterConfiguration config; private ElasticsearchClientManager manager; private BulkRequestBuilder bulkBuilder; + protected String usePercolateField; public PercolateTagProcessor(ElasticsearchWriterConfiguration config) { + this(config, DEFAULT_PERCOLATE_FIELD); + } + + public PercolateTagProcessor(ElasticsearchWriterConfiguration config, String defaultPercolateField) { this.config = config; + this.usePercolateField = defaultPercolateField; } public ElasticsearchClientManager getManager() { @@ -201,7 +205,7 @@ public class PercolateTagProcessor implements StreamsProcessor { deleteOldQueries(config.getIndex()); for (String tag : config.getTags().getAdditionalProperties().keySet()) { String query = (String)config.getTags().getAdditionalProperties().get(tag); - PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query); + PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query, this.usePercolateField); addPercolateRule(queryBuilder, config.getIndex()); } if (writePercolateRules() == true) @@ -314,59 +318,15 @@ public class PercolateTagProcessor implements StreamsProcessor { return !response.hasFailures(); } -// public static class PercolateQueryBuilder { -// -// private BoolQueryBuilder queryBuilder; -// private String id; -// -// public PercolateQueryBuilder(String id) { -// this.id = id; -// this.queryBuilder = QueryBuilders.boolQuery(); -// } -// -// public void setMinumumNumberShouldMatch(int shouldMatch) { -// this.queryBuilder.minimumNumberShouldMatch(shouldMatch); -// } -// -// -// public void addQuery(String query, FilterLevel level, String... fields) { -// QueryStringQueryBuilder builder = QueryBuilders.queryString(query); -// if(fields != null && fields.length > 0) { -// for(String field : fields) { -// builder.field(field); -// } -// } -// switch (level) { -// case MUST: -// this.queryBuilder.must(builder); -// break; -// case SHOULD: -// this.queryBuilder.should(builder); -// break; -// case MUST_NOT: -// this.queryBuilder.mustNot(builder); -// } -// } -// -// public String getId() { -// return this.id; -// } -// -// public String getSource() { -// return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}"; -// } -// -// -// } - public static class PercolateQueryBuilder { private QueryStringQueryBuilder queryBuilder; private String id; - public PercolateQueryBuilder(String id, String query) { + public PercolateQueryBuilder(String id, String query, String defaultPercolateField) { this.id = id; this.queryBuilder = QueryBuilders.queryString(query); + this.queryBuilder.defaultField(defaultPercolateField); } public String getId() { @@ -379,9 +339,7 @@ public class PercolateTagProcessor implements StreamsProcessor { } - public enum FilterLevel { MUST, SHOULD, MUST_NOT } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b386a52e/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java new file mode 100644 index 0000000..5b14b29 --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.elasticsearch.processor; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class PercolateTagProcessorTest { + private final String id = "test_id"; + private final String query = "test_query"; + private final String defaultPercolateField = "activity.content"; + + private final String expectedResults = "{ \n" + + "\"query\" : {\n" + + " \"query_string\" : {\n" + + " \"query\" : \"test_query\",\n" + + " \"default_field\" : \"activity.content\"\n" + + " }\n" + + "}\n" + + "}"; + + @Test + public void percolateTagProcessorQueryBuilderTest() { + PercolateTagProcessor.PercolateQueryBuilder percolateQueryBuilder = new PercolateTagProcessor.PercolateQueryBuilder(id, query, defaultPercolateField); + + assertEquals(id, percolateQueryBuilder.getId()); + assertEquals(expectedResults, percolateQueryBuilder.getSource()); + } +}
