Repository: nifi Updated Branches: refs/heads/master f7035f049 -> 3a29c1e4c
NIFI-5052 Added DeleteByQuery ElasticSearch processor. NIFI-5052 fixed checkstyle issue. NIFI-5052 Added changes requested in a code review. NIFI-5052 Fixed a typo. NIFI-5052 Added changes requested in a code review. Signed-off-by: Matthew Burgess <[email protected]> This closes #2616 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3a29c1e4 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3a29c1e4 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3a29c1e4 Branch: refs/heads/master Commit: 3a29c1e4ca703a8d042438cd7b576485371dacbd Parents: f7035f0 Author: Mike Thomsen <[email protected]> Authored: Sat Apr 7 06:03:01 2018 -0400 Committer: Matthew Burgess <[email protected]> Committed: Wed May 23 09:14:04 2018 -0400 ---------------------------------------------------------------------- .../elasticsearch/DeleteOperationResponse.java | 30 ++++ .../ElasticSearchClientService.java | 62 +++++++ .../elasticsearch/IndexOperationRequest.java | 50 ++++++ .../elasticsearch/IndexOperationResponse.java | 36 ++++ .../nifi-elasticsearch-client-service/pom.xml | 18 +- .../ElasticSearchClientServiceImpl.java | 83 +++++++++- .../ElasticSearch5ClientService_IT.java | 105 ------------ .../ElasticSearchClientService_IT.java | 165 +++++++++++++++++++ .../src/test/resources/setup.script | 30 ++-- .../DeleteByQueryElasticsearch.java | 145 ++++++++++++++++ .../ElasticSearchRestProcessor.java | 91 ++++++++++ .../elasticsearch/JsonQueryElasticsearch.java | 71 +------- .../org.apache.nifi.processor.Processor | 2 + .../DeleteByQueryElasticsearchTest.java | 134 +++++++++++++++ .../TestElasticSearchClientService.java | 47 +++++- 15 files changed, 866 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java new file mode 100644 index 0000000..1fd83d7 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +public class DeleteOperationResponse { + private long took; + + public DeleteOperationResponse(long took) { + this.took = took; + } + + public long getTook() { + return took; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java index 6854c55..188c7bb 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java @@ -25,6 +25,8 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; import java.io.IOException; +import java.util.List; +import java.util.Map; @Tags({"elasticsearch", "client"}) @CapabilityDescription("A controller service for accessing an ElasticSearch client.") @@ -96,6 +98,66 @@ public interface ElasticSearchClientService extends ControllerService { .build(); /** + * Index a document. + * + * @param operation A document to index. + * @return IndexOperationResponse if successful + * @throws IOException thrown when there is an error. + */ + IndexOperationResponse add(IndexOperationRequest operation) throws IOException; + + /** + * Index multiple documents. + * + * @param operations A list of documents to index. + * @return IndexOperationResponse if successful. + * @throws IOException thrown when there is an error. + */ + IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException; + + /** + * Delete a document by its ID from an index. + * + * @param index The index to target. + * @param type The type to target. Optional. + * @param id The document ID to remove from the selected index. + * @return A DeleteOperationResponse object if successful. + */ + DeleteOperationResponse deleteById(String index, String type, String id) throws IOException; + + + /** + * Delete multiple documents by ID from an index. + * @param index The index to target. + * @param type The type to target. Optional. + * @param ids A list of document IDs to remove from the selected index. + * @return A DeleteOperationResponse object if successful. + * @throws IOException thrown when there is an error. + */ + DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException; + + /** + * Delete documents by query. + * + * @param query A valid JSON query to be used for finding documents to delete. + * @param index The index to target. + * @param type The type to target within the index. Optional. + * @return A DeleteOperationResponse object if successful. + */ + DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException; + + /** + * Get a document by ID. + * + * @param index The index that holds the document. + * @param type The document type. Optional. + * @param id The document ID + * @return Map if successful, null if not found. + * @throws IOException thrown when there is an error. + */ + Map<String, Object> get(String index, String type, String id) throws IOException; + + /** * Perform a search using the JSON DSL. * * @param query A JSON string reprensenting the query. http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java new file mode 100644 index 0000000..9281adb --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import java.util.Map; + +public class IndexOperationRequest { + private String index; + private String type; + private String id; + private Map<String, Object> fields; + + public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields) { + this.index = index; + this.type = type; + this.id = id; + this.fields = fields; + } + + public String getIndex() { + return index; + } + + public String getType() { + return type; + } + + public String getId() { + return id; + } + + public Map<String, Object> getFields() { + return fields; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java new file mode 100644 index 0000000..a22b7aa --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +public class IndexOperationResponse { + private long took; + private long ingestTook; + + public IndexOperationResponse(long took, long ingestTook) { + this.took = took; + this.ingestTook = ingestTook; + } + + public long getTook() { + return took; + } + + public long getIngestTook() { + return ingestTook; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml index 8f52ab8..db79b05 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml @@ -70,11 +70,6 @@ </dependency> <dependency> - <groupId>org.elasticsearch.client</groupId> - <artifactId>rest</artifactId> - <version>5.0.1</version> - </dependency> - <dependency> <groupId>com.github.stephenc.findbugs</groupId> <artifactId>findbugs-annotations</artifactId> <version>1.3.9-1</version> @@ -120,6 +115,18 @@ <version>1.7.0-SNAPSHOT</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>5.6.8</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>elasticsearch-rest-high-level-client</artifactId> + <version>5.6.8</version> + <scope>compile</scope> + </dependency> </dependencies> <profiles> @@ -137,7 +144,6 @@ <httpPort>9400</httpPort> <version>5.6.2</version> <timeout>90</timeout> - <pathInitScript>${project.basedir}/src/test/resources/setup.script</pathInitScript> </configuration> <executions> <execution> http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java index 9896a19..111490b 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.nifi.elasticsearch; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.IOUtils; +import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -34,9 +35,17 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.ssl.SSLContextService; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; @@ -55,6 +64,7 @@ import java.security.SecureRandom; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -66,6 +76,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im static final private List<PropertyDescriptor> properties; private RestClient client; + private RestHighLevelClient highLevelClient; private String url; private Charset charset; @@ -182,18 +193,19 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im .setMaxRetryTimeoutMillis(retryTimeout); this.client = builder.build(); + this.highLevelClient = new RestHighLevelClient(client); } - private Response runQuery(String query, String index, String type) throws IOException { + private Response runQuery(String endpoint, String query, String index, String type) throws IOException { StringBuilder sb = new StringBuilder() - .append("/") - .append(index); + .append("/") + .append(index); if (type != null && !type.equals("")) { sb.append("/") - .append(type); + .append(type); } - sb.append("/_search"); + sb.append(String.format("/%s", endpoint)); HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON); @@ -216,8 +228,67 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im } @Override + public IndexOperationResponse add(IndexOperationRequest operation) throws IOException { + return add(Arrays.asList(operation)); + } + + @Override + public IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException { + BulkRequest bulkRequest = new BulkRequest(); + for (int index = 0; index < operations.size(); index++) { + IndexOperationRequest or = operations.get(index); + IndexRequest indexRequest = new IndexRequest(or.getIndex(), or.getType(), or.getId()) + .source(or.getFields()); + bulkRequest.add(indexRequest); + } + + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + BulkResponse response = highLevelClient.bulk(bulkRequest); + IndexOperationResponse retVal = new IndexOperationResponse(response.getTookInMillis(), response.getIngestTookInMillis()); + + return retVal; + } + + @Override + public DeleteOperationResponse deleteById(String index, String type, String id) throws IOException { + return deleteById(index, type, Arrays.asList(id)); + } + + @Override + public DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException { + BulkRequest bulk = new BulkRequest(); + for (int idx = 0; idx < ids.size(); idx++) { + DeleteRequest request = new DeleteRequest(index, type, ids.get(idx)); + bulk.add(request); + } + BulkResponse response = highLevelClient.bulk(bulk); + + DeleteOperationResponse dor = new DeleteOperationResponse(response.getTookInMillis()); + + return dor; + } + + @Override + public DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException { + long start = System.currentTimeMillis(); + Response response = runQuery("_delete_by_query", query, index, type); + long end = System.currentTimeMillis(); + Map<String, Object> parsed = parseResponse(response); + + return new DeleteOperationResponse(end - start); + } + + @Override + public Map<String, Object> get(String index, String type, String id) throws IOException { + GetRequest get = new GetRequest(index, type, id); + GetResponse resp = highLevelClient.get(get, new Header[]{}); + return resp.getSource(); + } + + @Override public SearchResponse search(String query, String index, String type) throws IOException { - Response response = runQuery(query, index, type); + Response response = runQuery("_search", query, index, type); Map<String, Object> parsed = parseResponse(response); int took = (Integer)parsed.get("took"); http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java deleted file mode 100644 index 7ab618d..0000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.elasticsearch.integration; - -import org.apache.nifi.elasticsearch.ElasticSearchClientService; -import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl; -import org.apache.nifi.elasticsearch.SearchResponse; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class ElasticSearch5ClientService_IT { - - private TestRunner runner; - private ElasticSearchClientServiceImpl service; - - @Before - public void before() throws Exception { - runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class); - service = new ElasticSearchClientServiceImpl(); - runner.addControllerService("Client Service", service); - runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400"); - runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000"); - runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000"); - runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000"); - try { - runner.enableControllerService(service); - } catch (Exception ex) { - ex.printStackTrace(); - throw ex; - } - } - - @After - public void after() throws Exception { - service.onDisabled(); - } - - @Test - public void testBasicSearch() throws Exception { - String query = "{\n" + - "\t\"size\": 10,\n" + - "\t\"query\": {\n" + - "\t\t\"match_all\": {}\n" + - "\t},\n" + - "\t\"aggs\": {\n" + - "\t\t\"term_counts\": {\n" + - "\t\t\t\"terms\": {\n" + - "\t\t\t\t\"field\": \"msg\",\n" + - "\t\t\t\t\"size\": 5\n" + - "\t\t\t}\n" + - "\t\t}\n" + - "\t}\n" + - "}"; - SearchResponse response = service.search(query, "messages", "message"); - Assert.assertNotNull("Response was null", response); - - Assert.assertEquals("Wrong count", response.getNumberOfHits(), 15); - Assert.assertFalse("Timed out", response.isTimedOut()); - Assert.assertNotNull("Hits was null", response.getHits()); - Assert.assertEquals("Wrong number of hits", 10, response.getHits().size()); - Assert.assertNotNull("Aggregations are missing", response.getAggregations()); - Assert.assertEquals("Aggregation count is wrong", 1, response.getAggregations().size()); - - Map<String, Object> termCounts = (Map<String, Object>) response.getAggregations().get("term_counts"); - Assert.assertNotNull("Term counts was missing", termCounts); - List<Map<String, Object>> buckets = (List<Map<String, Object>>) termCounts.get("buckets"); - Assert.assertNotNull("Buckets branch was empty", buckets); - Map<String, Integer> expected = new HashMap<>(); - expected.put("one", 1); - expected.put("two", 2); - expected.put("three", 3); - expected.put("four", 4); - expected.put("five", 5); - - for (Map<String, Object> aggRes : buckets) { - String key = (String)aggRes.get("key"); - Integer docCount = (Integer)aggRes.get("doc_count"); - - Assert.assertEquals(String.format("%s did not match", key), expected.get(key), docCount); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java new file mode 100644 index 0000000..687faf0 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch.integration; + +import org.apache.nifi.elasticsearch.DeleteOperationResponse; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl; +import org.apache.nifi.elasticsearch.IndexOperationRequest; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ElasticSearchClientService_IT { + + private TestRunner runner; + private ElasticSearchClientServiceImpl service; + + static final String INDEX = "messages"; + static final String TYPE = "message"; + + @Before + public void before() throws Exception { + runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class); + service = new ElasticSearchClientServiceImpl(); + runner.addControllerService("Client Service", service); + runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400"); + runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000"); + runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000"); + runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000"); + try { + runner.enableControllerService(service); + } catch (Exception ex) { + ex.printStackTrace(); + throw ex; + } + + Map<String, Integer> expected = new HashMap<>(); + expected.put("one", 1); + expected.put("two", 2); + expected.put("three", 3); + expected.put("four", 4); + expected.put("five", 5); + + + int index = 1; + List<IndexOperationRequest> docs = new ArrayList<>(); + for (Map.Entry<String, Integer> entry : expected.entrySet()) { + for (int idx = 0; idx < entry.getValue(); idx++) { + Map<String, Object> fields = new HashMap<>(); + fields.put("msg", entry.getKey()); + IndexOperationRequest ior = new IndexOperationRequest(INDEX, TYPE, String.valueOf(index++), fields); + docs.add(ior); + } + } + service.add(docs); + } + + @After + public void after() throws Exception { + service.onDisabled(); + } + + @Test + public void testBasicSearch() throws Exception { + String query = "{\n" + + "\t\"size\": 10,\n" + + "\t\"query\": {\n" + + "\t\t\"match_all\": {}\n" + + "\t},\n" + + "\t\"aggs\": {\n" + + "\t\t\"term_counts\": {\n" + + "\t\t\t\"terms\": {\n" + + "\t\t\t\t\"field\": \"msg.keyword\",\n" + + "\t\t\t\t\"size\": 5\n" + + "\t\t\t}\n" + + "\t\t}\n" + + "\t}\n" + + "}"; + SearchResponse response = service.search(query, INDEX, TYPE); + Assert.assertNotNull("Response was null", response); + + Assert.assertEquals("Wrong count", 15, response.getNumberOfHits()); + Assert.assertFalse("Timed out", response.isTimedOut()); + Assert.assertNotNull("Hits was null", response.getHits()); + Assert.assertEquals("Wrong number of hits", 10, response.getHits().size()); + Assert.assertNotNull("Aggregations are missing", response.getAggregations()); + Assert.assertEquals("Aggregation count is wrong", 1, response.getAggregations().size()); + + Map<String, Object> termCounts = (Map<String, Object>) response.getAggregations().get("term_counts"); + Assert.assertNotNull("Term counts was missing", termCounts); + List<Map<String, Object>> buckets = (List<Map<String, Object>>) termCounts.get("buckets"); + Assert.assertNotNull("Buckets branch was empty", buckets); + Map<String, Integer> expected = new HashMap<>(); + expected.put("one", 1); + expected.put("two", 2); + expected.put("three", 3); + expected.put("four", 4); + expected.put("five", 5); + + for (Map<String, Object> aggRes : buckets) { + String key = (String)aggRes.get("key"); + Integer docCount = (Integer)aggRes.get("doc_count"); + + Assert.assertEquals(String.format("%s did not match", key), expected.get(key), docCount); + } + } + + @Test + public void testDeleteByQuery() throws Exception { + String query = "{\"query\":{\"match\":{\"msg\":\"five\"}}}"; + DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE); + Assert.assertNotNull(response); + Assert.assertTrue(response.getTook() > 0); + } + + @Test + public void testDeleteById() throws Exception { + final String ID = "1"; + DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID); + Assert.assertNotNull(response); + Assert.assertTrue(response.getTook() > 0); + Map<String, Object> doc = service.get(INDEX, TYPE, ID); + Assert.assertNull(doc); + doc = service.get(INDEX, TYPE, "2"); + Assert.assertNotNull(doc); + } + + @Test + public void testGet() throws IOException { + Map<String, Object> old = null; + for (int index = 1; index <= 15; index++) { + String id = String.valueOf(index); + Map<String, Object> doc = service.get(INDEX, TYPE, id); + Assert.assertNotNull(doc); + Assert.assertNotNull(doc.toString() + "\t" + doc.keySet().toString(), doc.get("msg")); + Assert.assertFalse(doc == old); + old = doc; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script index 69671b6..8cf4c97 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script @@ -15,18 +15,18 @@ #create mapping PUT:messages/:{ "mappings":{"message":{ "properties":{ "msg":{"type":"keyword"}}}}} #add document -PUT:messages/message/1:{ "msg ":"one" } -PUT:messages/message/2:{ "msg ":"two" } -PUT:messages/message/3:{ "msg ":"two" } -PUT:messages/message/4:{ "msg ":"three" } -PUT:messages/message/5:{ "msg ":"three" } -PUT:messages/message/6:{ "msg ":"three" } -PUT:messages/message/7:{ "msg ":"four" } -PUT:messages/message/8:{ "msg ":"four" } -PUT:messages/message/9:{ "msg ":"four" } -PUT:messages/message/10:{ "msg ":"four" } -PUT:messages/message/11:{ "msg ":"five" } -PUT:messages/message/12:{ "msg ":"five" } -PUT:messages/message/13:{ "msg ":"five" } -PUT:messages/message/14:{ "msg ":"five" } -PUT:messages/message/15:{ "msg ":"five" } \ No newline at end of file +PUT:messages/message/1:{ "msg":"one" } +PUT:messages/message/2:{ "msg":"two" } +PUT:messages/message/3:{ "msg":"two" } +PUT:messages/message/4:{ "msg":"three" } +PUT:messages/message/5:{ "msg":"three" } +PUT:messages/message/6:{ "msg":"three" } +PUT:messages/message/7:{ "msg":"four" } +PUT:messages/message/8:{ "msg":"four" } +PUT:messages/message/9:{ "msg":"four" } +PUT:messages/message/10:{ "msg":"four" } +PUT:messages/message/11:{ "msg":"five" } +PUT:messages/message/12:{ "msg":"five" } +PUT:messages/message/13:{ "msg":"five" } +PUT:messages/message/14:{ "msg":"five" } +PUT:messages/message/15:{ "msg":"five" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java new file mode 100644 index 0000000..609d8ed --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.elasticsearch.DeleteOperationResponse; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription("Delete from an ElasticSearch index using a query. The query can be loaded from a flowfile body " + + "or from the Query parameter.") +@Tags({ "elastic", "elasticsearch", "delete", "query"}) +@WritesAttributes({ + @WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."), + @WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by ElasticSearch if there is an error running the delete.") +}) +public class DeleteByQueryElasticsearch extends AbstractProcessor implements ElasticSearchRestProcessor { + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("If the delete by query fails, and a flowfile was read, it will be sent to this relationship.").build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("If the delete by query succeeds, and a flowfile was read, it will be sent to this relationship.") + .build(); + + + static final String TOOK_ATTRIBUTE = "elasticsearch.delete.took"; + static final String ERROR_ATTRIBUTE = "elasticsearch.delete.error"; + + private static final Set<Relationship> relationships; + private static final List<PropertyDescriptor> propertyDescriptors; + + private volatile ElasticSearchClientService clientService; + + static { + final Set<Relationship> _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_rels); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(QUERY); + descriptors.add(QUERY_ATTRIBUTE); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(CLIENT_SERVICE); + + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile input = null; + if (context.hasIncomingConnection()) { + input = session.get(); + + if (input == null && context.hasNonLoopConnection()) { + return; + } + } + + try { + final String query = getQuery(input, context, session); + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue(); + final String type = context.getProperty(TYPE).isSet() + ? context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue() + : null; + final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet() + ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue() + : null; + DeleteOperationResponse dor = clientService.deleteByQuery(query, index, type); + + if (input == null) { + input = session.create(); + } + + Map<String, String> attrs = new HashMap<>(); + attrs.put(TOOK_ATTRIBUTE, String.valueOf(dor.getTook())); + if (!StringUtils.isBlank(queryAttr)) { + attrs.put(queryAttr, query); + } + + input = session.putAllAttributes(input, attrs); + + session.transfer(input, REL_SUCCESS); + } catch (Exception e) { + if (input != null) { + input = session.putAttribute(input, ERROR_ATTRIBUTE, e.getMessage()); + session.transfer(input, REL_FAILURE); + } + getLogger().error("Error running delete by query: ", e); + context.yield(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java new file mode 100644 index 0000000..e94a1cc --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public interface ElasticSearchRestProcessor { + PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("el-rest-fetch-index") + .displayName("Index") + .description("The name of the index to use.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("el-rest-type") + .displayName("Type") + .description("The type of this document (used by Elasticsearch for indexing and searching)") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("el-rest-query") + .displayName("Query") + .description("A query in JSON syntax, not Lucene syntax. Ex: {\"query\":{\"match\":{\"somefield\":\"somevalue\"}}}. " + + "If this parameter is not set, the query will be read from the flowfile content.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("el-query-attribute") + .displayName("Query Attribute") + .description("If set, the executed query will be set on each result flowfile in the specified attribute.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(Validator.VALID) + .required(false) + .build(); + + PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("el-rest-client-service") + .displayName("Client Service") + .description("An ElasticSearch client service to use for running queries.") + .identifiesControllerService(ElasticSearchClientService.class) + .required(true) + .build(); + + default String getQuery(FlowFile input, ProcessContext context, ProcessSession session) throws IOException { + String retVal = null; + if (context.getProperty(QUERY).isSet()) { + retVal = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue(); + } else if (input != null) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + session.exportTo(input, out); + out.close(); + + retVal = new String(out.toByteArray()); + } + + return retVal; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java index 64be962..40d6118 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java @@ -27,7 +27,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.Validator; import org.apache.nifi.elasticsearch.ElasticSearchClientService; import org.apache.nifi.elasticsearch.SearchResponse; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -38,10 +37,8 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StringUtils; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -62,7 +59,7 @@ import java.util.Set; "ElasticSearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " + "processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " + "from ElasticSearch will be loaded into memory all at once and converted into the resulting flowfiles.") -public class JsonQueryElasticsearch extends AbstractProcessor { +public class JsonQueryElasticsearch extends AbstractProcessor implements ElasticSearchRestProcessor { public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") .description("All original flowfiles that don't cause an error to occur go to this relationship. " + "This applies even if you select the \"split up hits\" option to send individual hits to the " + @@ -78,49 +75,6 @@ public class JsonQueryElasticsearch extends AbstractProcessor { public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations") .description("Aggregations are routed to this relationship.") .build(); - - public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() - .name("el-rest-fetch-index") - .displayName("Index") - .description("The name of the index to read from") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() - .name("el-rest-type") - .displayName("Type") - .description("The type of this document (used by Elasticsearch for indexing and searching)") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() - .name("el-rest-query") - .displayName("Query") - .description("A query in JSON syntax, not Lucene syntax. Ex: " + - "{\n" + - "\t\"query\": {\n" + - "\t\t\"match\": {\n" + - "\t\t\t\"name\": \"John Smith\"\n" + - "\t\t}\n" + - "\t}\n" + - "}") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder() - .name("el-query-attribute") - .displayName("Query Attribute") - .description("If set, the executed query will be set on each result flowfile in the specified attribute.") - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(Validator.VALID) - .required(false) - .build(); - public static final AllowableValue SPLIT_UP_YES = new AllowableValue( "splitUp-yes", "Yes", @@ -151,14 +105,6 @@ public class JsonQueryElasticsearch extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); - public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() - .name("el-rest-client-service") - .displayName("Client Service") - .description("An ElasticSearch client service to use for running queries.") - .identifiesControllerService(ElasticSearchClientService.class) - .required(true) - .build(); - private static final Set<Relationship> relationships; private static final List<PropertyDescriptor> propertyDescriptors; @@ -207,21 +153,6 @@ public class JsonQueryElasticsearch extends AbstractProcessor { private final ObjectMapper mapper = new ObjectMapper(); - private String getQuery(FlowFile input, ProcessContext context, ProcessSession session) throws IOException { - String retVal = null; - if (context.getProperty(QUERY).isSet()) { - retVal = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue(); - } else if (input != null) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - session.exportTo(input, out); - out.close(); - - retVal = new String(out.toByteArray()); - } - - return retVal; - } - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile input = null; http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index d57ff36..ac8c915 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,4 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java new file mode 100644 index 0000000..ade1102 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DeleteByQueryElasticsearchTest { + private static final String INDEX = "test_idx"; + private static final String TYPE = "test_type"; + private static final String QUERY_ATTR = "es.delete.query"; + private static final String CLIENT_NAME = "clientService"; + + private TestElasticSearchClientService client; + + private void initClient(TestRunner runner) throws Exception { + client = new TestElasticSearchClientService(true); + runner.addControllerService(CLIENT_NAME, client); + runner.enableControllerService(client); + runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME); + } + + private void postTest(TestRunner runner, String queryParam) { + runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0); + runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS); + String attr = flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE); + String query = flowFiles.get(0).getAttribute(QUERY_ATTR); + Assert.assertNotNull(attr); + Assert.assertEquals(attr, "100"); + Assert.assertNotNull(query); + Assert.assertEquals(queryParam, query); + } + + @Test + public void testWithFlowfileInput() throws Exception { + String query = "{ \"query\": { \"match_all\": {} }}"; + TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class); + runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX); + runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE); + runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR); + initClient(runner); + runner.assertValid(); + runner.enqueue(query); + runner.run(); + + postTest(runner, query); + } + + @Test + public void testWithQuery() throws Exception { + String query = "{\n" + + "\t\"query\": {\n" + + "\t\t\"match\": {\n" + + "\t\t\t\"${field.name}.keyword\": \"test\"\n" + + "\t\t}\n" + + "\t}\n" + + "}"; + Map<String, String> attrs = new HashMap<String, String>(){{ + put("field.name", "test_field"); + }}; + TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class); + runner.setProperty(DeleteByQueryElasticsearch.QUERY, query); + runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX); + runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE); + runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR); + initClient(runner); + runner.assertValid(); + runner.enqueue("", attrs); + runner.run(); + + postTest(runner, query.replace("${field.name}", "test_field")); + + runner.clearTransferState(); + + query = "{\n" + + "\t\"query\": {\n" + + "\t\t\"match\": {\n" + + "\t\t\t\"test_field.keyword\": \"test\"\n" + + "\t\t}\n" + + "\t}\n" + + "}"; + runner.setProperty(DeleteByQueryElasticsearch.QUERY, query); + runner.setIncomingConnection(false); + runner.assertValid(); + runner.run(); + postTest(runner, query); + } + + @Test + public void testErrorAttribute() throws Exception { + String query = "{ \"query\": { \"match_all\": {} }}"; + TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class); + runner.setProperty(DeleteByQueryElasticsearch.QUERY, query); + runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX); + runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE); + runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR); + initClient(runner); + client.setThrowErrorInDelete(true); + runner.assertValid(); + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0); + runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1); + + MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0); + String attr = mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE); + Assert.assertNotNull(attr); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java index 445a093..b1d4220 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java @@ -19,25 +19,66 @@ package org.apache.nifi.processors.elasticsearch; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.elasticsearch.DeleteOperationResponse; import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.IndexOperationRequest; +import org.apache.nifi.elasticsearch.IndexOperationResponse; import org.apache.nifi.elasticsearch.SearchResponse; import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; public class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService { private boolean returnAggs; private boolean throwErrorInSearch; + private boolean throwErrorInDelete; public TestElasticSearchClientService(boolean returnAggs) { this.returnAggs = returnAggs; } @Override + public IndexOperationResponse add(IndexOperationRequest operation) throws IOException { + return add(Arrays.asList(operation)); + } + + @Override + public IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException { + return new IndexOperationResponse(100L, 100L); + } + + @Override + public DeleteOperationResponse deleteById(String index, String type, String id) throws IOException { + return deleteById(index, type, Arrays.asList(id)); + } + + @Override + public DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException { + if (throwErrorInDelete) { + throw new IOException("Simulated IOException"); + } + return new DeleteOperationResponse(100L); + } + + @Override + public DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException { + return deleteById(index, type, Arrays.asList("1")); + } + + @Override + public Map<String, Object> get(String index, String type, String id) throws IOException { + return new HashMap<String, Object>(){{ + put("msg", "one"); + }}; + } + + @Override public SearchResponse search(String query, String index, String type) throws IOException { if (throwErrorInSearch) { - throw new IOException(); + throw new IOException("Simulated IOException"); } ObjectMapper mapper = new ObjectMapper(); @@ -203,4 +244,8 @@ public class TestElasticSearchClientService extends AbstractControllerService im public void setThrowErrorInSearch(boolean throwErrorInSearch) { this.throwErrorInSearch = throwErrorInSearch; } + + public void setThrowErrorInDelete(boolean throwErrorInDelete) { + this.throwErrorInDelete = throwErrorInDelete; + } }
