Repository: nifi
Updated Branches:
  refs/heads/master f7035f049 -> 3a29c1e4c


NIFI-5052 Added DeleteByQuery ElasticSearch processor.

NIFI-5052 fixed checkstyle issue.

NIFI-5052 Added changes requested in a code review.

NIFI-5052 Fixed a typo.

NIFI-5052 Added changes requested in a code review.

Signed-off-by: Matthew Burgess <[email protected]>

This closes #2616


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3a29c1e4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3a29c1e4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3a29c1e4

Branch: refs/heads/master
Commit: 3a29c1e4ca703a8d042438cd7b576485371dacbd
Parents: f7035f0
Author: Mike Thomsen <[email protected]>
Authored: Sat Apr 7 06:03:01 2018 -0400
Committer: Matthew Burgess <[email protected]>
Committed: Wed May 23 09:14:04 2018 -0400

----------------------------------------------------------------------
 .../elasticsearch/DeleteOperationResponse.java  |  30 ++++
 .../ElasticSearchClientService.java             |  62 +++++++
 .../elasticsearch/IndexOperationRequest.java    |  50 ++++++
 .../elasticsearch/IndexOperationResponse.java   |  36 ++++
 .../nifi-elasticsearch-client-service/pom.xml   |  18 +-
 .../ElasticSearchClientServiceImpl.java         |  83 +++++++++-
 .../ElasticSearch5ClientService_IT.java         | 105 ------------
 .../ElasticSearchClientService_IT.java          | 165 +++++++++++++++++++
 .../src/test/resources/setup.script             |  30 ++--
 .../DeleteByQueryElasticsearch.java             | 145 ++++++++++++++++
 .../ElasticSearchRestProcessor.java             |  91 ++++++++++
 .../elasticsearch/JsonQueryElasticsearch.java   |  71 +-------
 .../org.apache.nifi.processor.Processor         |   2 +
 .../DeleteByQueryElasticsearchTest.java         | 134 +++++++++++++++
 .../TestElasticSearchClientService.java         |  47 +++++-
 15 files changed, 866 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java
new file mode 100644
index 0000000..1fd83d7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch;
+
+public class DeleteOperationResponse {
+    private long took;
+
+    public DeleteOperationResponse(long took) {
+        this.took = took;
+    }
+
+    public long getTook() {
+        return took;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 6854c55..188c7bb 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -25,6 +25,8 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 
 @Tags({"elasticsearch", "client"})
 @CapabilityDescription("A controller service for accessing an ElasticSearch 
client.")
@@ -96,6 +98,66 @@ public interface ElasticSearchClientService extends 
ControllerService {
             .build();
 
     /**
+     * Index a document.
+     *
+     * @param operation A document to index.
+     * @return IndexOperationResponse if successful
+     * @throws IOException thrown when there is an error.
+     */
+    IndexOperationResponse add(IndexOperationRequest operation) throws 
IOException;
+
+    /**
+     * Index multiple documents.
+     *
+     * @param operations A list of documents to index.
+     * @return IndexOperationResponse if successful.
+     * @throws IOException thrown when there is an error.
+     */
+    IndexOperationResponse add(List<IndexOperationRequest> operations) throws 
IOException;
+
+    /**
+     * Delete a document by its ID from an index.
+     *
+     * @param index The index to target.
+     * @param type The type to target. Optional.
+     * @param id The document ID to remove from the selected index.
+     * @return A DeleteOperationResponse object if successful.
+     */
+    DeleteOperationResponse deleteById(String index, String type, String id) 
throws IOException;
+
+
+    /**
+     * Delete multiple documents by ID from an index.
+     * @param index The index to target.
+     * @param type The type to target. Optional.
+     * @param ids A list of document IDs to remove from the selected index.
+     * @return A DeleteOperationResponse object if successful.
+     * @throws IOException thrown when there is an error.
+     */
+    DeleteOperationResponse deleteById(String index, String type, List<String> 
ids) throws IOException;
+
+    /**
+     * Delete documents by query.
+     *
+     * @param query A valid JSON query to be used for finding documents to 
delete.
+     * @param index The index to target.
+     * @param type The type to target within the index. Optional.
+     * @return A DeleteOperationResponse object if successful.
+     */
+    DeleteOperationResponse deleteByQuery(String query, String index, String 
type) throws IOException;
+
+    /**
+     * Get a document by ID.
+     *
+     * @param index The index that holds the document.
+     * @param type The document type. Optional.
+     * @param id The document ID
+     * @return Map if successful, null if not found.
+     * @throws IOException thrown when there is an error.
+     */
+    Map<String, Object> get(String index, String type, String id) throws 
IOException;
+
+    /**
      * Perform a search using the JSON DSL.
      *
      * @param query A JSON string reprensenting the query.

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
new file mode 100644
index 0000000..9281adb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch;
+
+import java.util.Map;
+
+public class IndexOperationRequest {
+    private String index;
+    private String type;
+    private String id;
+    private Map<String, Object> fields;
+
+    public IndexOperationRequest(String index, String type, String id, 
Map<String, Object> fields) {
+        this.index = index;
+        this.type = type;
+        this.id = id;
+        this.fields = fields;
+    }
+
+    public String getIndex() {
+        return index;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public Map<String, Object> getFields() {
+        return fields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
new file mode 100644
index 0000000..a22b7aa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch;
+
+public class IndexOperationResponse {
+    private long took;
+    private long ingestTook;
+
+    public IndexOperationResponse(long took, long ingestTook) {
+        this.took = took;
+        this.ingestTook = ingestTook;
+    }
+
+    public long getTook() {
+        return took;
+    }
+
+    public long getIngestTook() {
+        return ingestTook;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index 8f52ab8..db79b05 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -70,11 +70,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.elasticsearch.client</groupId>
-            <artifactId>rest</artifactId>
-            <version>5.0.1</version>
-        </dependency>
-        <dependency>
             <groupId>com.github.stephenc.findbugs</groupId>
             <artifactId>findbugs-annotations</artifactId>
             <version>1.3.9-1</version>
@@ -120,6 +115,18 @@
             <version>1.7.0-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>5.6.8</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
+            <version>5.6.8</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
@@ -137,7 +144,6 @@
                             <httpPort>9400</httpPort>
                             <version>5.6.2</version>
                             <timeout>90</timeout>
-                            
<pathInitScript>${project.basedir}/src/test/resources/setup.script</pathInitScript>
                         </configuration>
                         <executions>
                             <execution>

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 9896a19..111490b 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.nifi.elasticsearch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.IOUtils;
+import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
@@ -34,9 +35,17 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.ssl.SSLContextService;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
@@ -55,6 +64,7 @@ import java.security.SecureRandom;
 import java.security.UnrecoverableKeyException;
 import java.security.cert.CertificateException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -66,6 +76,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
     static final private List<PropertyDescriptor> properties;
 
     private RestClient client;
+    private RestHighLevelClient highLevelClient;
 
     private String url;
     private Charset charset;
@@ -182,18 +193,19 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             .setMaxRetryTimeoutMillis(retryTimeout);
 
         this.client = builder.build();
+        this.highLevelClient = new RestHighLevelClient(client);
     }
 
-    private Response runQuery(String query, String index, String type) throws 
IOException {
+    private Response runQuery(String endpoint, String query, String index, 
String type) throws IOException {
         StringBuilder sb = new StringBuilder()
-                .append("/")
-                .append(index);
+            .append("/")
+            .append(index);
         if (type != null && !type.equals("")) {
             sb.append("/")
-                    .append(type);
+            .append(type);
         }
 
-        sb.append("/_search");
+        sb.append(String.format("/%s", endpoint));
 
         HttpEntity queryEntity = new NStringEntity(query, 
ContentType.APPLICATION_JSON);
 
@@ -216,8 +228,67 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
     }
 
     @Override
+    public IndexOperationResponse add(IndexOperationRequest operation) throws 
IOException {
+        return add(Arrays.asList(operation));
+    }
+
+    @Override
+    public IndexOperationResponse add(List<IndexOperationRequest> operations) 
throws IOException {
+        BulkRequest bulkRequest = new BulkRequest();
+        for (int index = 0; index < operations.size(); index++) {
+            IndexOperationRequest or = operations.get(index);
+            IndexRequest indexRequest = new IndexRequest(or.getIndex(), 
or.getType(), or.getId())
+                .source(or.getFields());
+            bulkRequest.add(indexRequest);
+        }
+
+        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+
+        BulkResponse response = highLevelClient.bulk(bulkRequest);
+        IndexOperationResponse retVal = new 
IndexOperationResponse(response.getTookInMillis(), 
response.getIngestTookInMillis());
+
+        return retVal;
+    }
+
+    @Override
+    public DeleteOperationResponse deleteById(String index, String type, 
String id) throws IOException {
+        return deleteById(index, type, Arrays.asList(id));
+    }
+
+    @Override
+    public DeleteOperationResponse deleteById(String index, String type, 
List<String> ids) throws IOException {
+        BulkRequest bulk = new BulkRequest();
+        for (int idx = 0; idx < ids.size(); idx++) {
+            DeleteRequest request = new DeleteRequest(index, type, 
ids.get(idx));
+            bulk.add(request);
+        }
+        BulkResponse response = highLevelClient.bulk(bulk);
+
+        DeleteOperationResponse dor = new 
DeleteOperationResponse(response.getTookInMillis());
+
+        return dor;
+    }
+
+    @Override
+    public DeleteOperationResponse deleteByQuery(String query, String index, 
String type) throws IOException {
+        long start = System.currentTimeMillis();
+        Response response = runQuery("_delete_by_query", query, index, type);
+        long end   = System.currentTimeMillis();
+        Map<String, Object> parsed = parseResponse(response);
+
+        return new DeleteOperationResponse(end - start);
+    }
+
+    @Override
+    public Map<String, Object> get(String index, String type, String id) 
throws IOException {
+        GetRequest get = new GetRequest(index, type, id);
+        GetResponse resp = highLevelClient.get(get, new Header[]{});
+        return resp.getSource();
+    }
+
+    @Override
     public SearchResponse search(String query, String index, String type) 
throws IOException {
-        Response response = runQuery(query, index, type);
+        Response response = runQuery("_search", query, index, type);
         Map<String, Object> parsed = parseResponse(response);
 
         int took = (Integer)parsed.get("took");

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java
deleted file mode 100644
index 7ab618d..0000000
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.elasticsearch.integration;
-
-import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
-import org.apache.nifi.elasticsearch.SearchResponse;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ElasticSearch5ClientService_IT {
-
-    private TestRunner runner;
-    private ElasticSearchClientServiceImpl service;
-
-    @Before
-    public void before() throws Exception {
-        runner = 
TestRunners.newTestRunner(TestControllerServiceProcessor.class);
-        service = new ElasticSearchClientServiceImpl();
-        runner.addControllerService("Client Service", service);
-        runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, 
"http://localhost:9400";);
-        runner.setProperty(service, 
ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
-        runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, 
"60000");
-        runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, 
"60000");
-        try {
-            runner.enableControllerService(service);
-        } catch (Exception ex) {
-            ex.printStackTrace();
-            throw ex;
-        }
-    }
-
-    @After
-    public void after() throws Exception {
-        service.onDisabled();
-    }
-
-    @Test
-    public void testBasicSearch() throws Exception {
-        String query = "{\n" +
-                "\t\"size\": 10,\n" +
-                "\t\"query\": {\n" +
-                "\t\t\"match_all\": {}\n" +
-                "\t},\n" +
-                "\t\"aggs\": {\n" +
-                "\t\t\"term_counts\": {\n" +
-                "\t\t\t\"terms\": {\n" +
-                "\t\t\t\t\"field\": \"msg\",\n" +
-                "\t\t\t\t\"size\": 5\n" +
-                "\t\t\t}\n" +
-                "\t\t}\n" +
-                "\t}\n" +
-                "}";
-        SearchResponse response = service.search(query, "messages", "message");
-        Assert.assertNotNull("Response was null", response);
-
-        Assert.assertEquals("Wrong count", response.getNumberOfHits(), 15);
-        Assert.assertFalse("Timed out", response.isTimedOut());
-        Assert.assertNotNull("Hits was null", response.getHits());
-        Assert.assertEquals("Wrong number of hits", 10, 
response.getHits().size());
-        Assert.assertNotNull("Aggregations are missing", 
response.getAggregations());
-        Assert.assertEquals("Aggregation count is wrong", 1, 
response.getAggregations().size());
-
-        Map<String, Object> termCounts = (Map<String, Object>) 
response.getAggregations().get("term_counts");
-        Assert.assertNotNull("Term counts was missing", termCounts);
-        List<Map<String, Object>> buckets = (List<Map<String, Object>>) 
termCounts.get("buckets");
-        Assert.assertNotNull("Buckets branch was empty", buckets);
-        Map<String, Integer> expected = new HashMap<>();
-        expected.put("one", 1);
-        expected.put("two", 2);
-        expected.put("three", 3);
-        expected.put("four", 4);
-        expected.put("five", 5);
-
-        for (Map<String, Object> aggRes : buckets) {
-            String key = (String)aggRes.get("key");
-            Integer docCount = (Integer)aggRes.get("doc_count");
-
-            Assert.assertEquals(String.format("%s did not match", key), 
expected.get(key), docCount);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
new file mode 100644
index 0000000..687faf0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch.integration;
+
+import org.apache.nifi.elasticsearch.DeleteOperationResponse;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticSearchClientService_IT {
+
+    private TestRunner runner;
+    private ElasticSearchClientServiceImpl service;
+
+    static final String INDEX = "messages";
+    static final String TYPE  = "message";
+
+    @Before
+    public void before() throws Exception {
+        runner = 
TestRunners.newTestRunner(TestControllerServiceProcessor.class);
+        service = new ElasticSearchClientServiceImpl();
+        runner.addControllerService("Client Service", service);
+        runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, 
"http://localhost:9400";);
+        runner.setProperty(service, 
ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
+        runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, 
"60000");
+        runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, 
"60000");
+        try {
+            runner.enableControllerService(service);
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            throw ex;
+        }
+
+        Map<String, Integer> expected = new HashMap<>();
+        expected.put("one", 1);
+        expected.put("two", 2);
+        expected.put("three", 3);
+        expected.put("four", 4);
+        expected.put("five", 5);
+
+
+        int index = 1;
+        List<IndexOperationRequest> docs = new ArrayList<>();
+        for (Map.Entry<String, Integer> entry : expected.entrySet()) {
+            for (int idx = 0; idx < entry.getValue(); idx++) {
+                Map<String, Object> fields = new HashMap<>();
+                fields.put("msg", entry.getKey());
+                IndexOperationRequest ior = new IndexOperationRequest(INDEX, 
TYPE, String.valueOf(index++), fields);
+                docs.add(ior);
+            }
+        }
+        service.add(docs);
+    }
+
+    @After
+    public void after() throws Exception {
+        service.onDisabled();
+    }
+
+    @Test
+    public void testBasicSearch() throws Exception {
+        String query = "{\n" +
+                "\t\"size\": 10,\n" +
+                "\t\"query\": {\n" +
+                "\t\t\"match_all\": {}\n" +
+                "\t},\n" +
+                "\t\"aggs\": {\n" +
+                "\t\t\"term_counts\": {\n" +
+                "\t\t\t\"terms\": {\n" +
+                "\t\t\t\t\"field\": \"msg.keyword\",\n" +
+                "\t\t\t\t\"size\": 5\n" +
+                "\t\t\t}\n" +
+                "\t\t}\n" +
+                "\t}\n" +
+                "}";
+        SearchResponse response = service.search(query, INDEX, TYPE);
+        Assert.assertNotNull("Response was null", response);
+
+        Assert.assertEquals("Wrong count", 15, response.getNumberOfHits());
+        Assert.assertFalse("Timed out", response.isTimedOut());
+        Assert.assertNotNull("Hits was null", response.getHits());
+        Assert.assertEquals("Wrong number of hits", 10, 
response.getHits().size());
+        Assert.assertNotNull("Aggregations are missing", 
response.getAggregations());
+        Assert.assertEquals("Aggregation count is wrong", 1, 
response.getAggregations().size());
+
+        Map<String, Object> termCounts = (Map<String, Object>) 
response.getAggregations().get("term_counts");
+        Assert.assertNotNull("Term counts was missing", termCounts);
+        List<Map<String, Object>> buckets = (List<Map<String, Object>>) 
termCounts.get("buckets");
+        Assert.assertNotNull("Buckets branch was empty", buckets);
+        Map<String, Integer> expected = new HashMap<>();
+        expected.put("one", 1);
+        expected.put("two", 2);
+        expected.put("three", 3);
+        expected.put("four", 4);
+        expected.put("five", 5);
+
+        for (Map<String, Object> aggRes : buckets) {
+            String key = (String)aggRes.get("key");
+            Integer docCount = (Integer)aggRes.get("doc_count");
+
+            Assert.assertEquals(String.format("%s did not match", key), 
expected.get(key), docCount);
+        }
+    }
+
+    @Test
+    public void testDeleteByQuery() throws Exception {
+        String query = "{\"query\":{\"match\":{\"msg\":\"five\"}}}";
+        DeleteOperationResponse response = service.deleteByQuery(query, INDEX, 
TYPE);
+        Assert.assertNotNull(response);
+        Assert.assertTrue(response.getTook() > 0);
+    }
+
+    @Test
+    public void testDeleteById() throws Exception {
+        final String ID = "1";
+        DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID);
+        Assert.assertNotNull(response);
+        Assert.assertTrue(response.getTook() > 0);
+        Map<String, Object> doc = service.get(INDEX, TYPE, ID);
+        Assert.assertNull(doc);
+        doc = service.get(INDEX, TYPE, "2");
+        Assert.assertNotNull(doc);
+    }
+
+    @Test
+    public void testGet() throws IOException {
+        Map<String, Object> old = null;
+        for (int index = 1; index <= 15; index++) {
+            String id = String.valueOf(index);
+            Map<String, Object> doc = service.get(INDEX, TYPE, id);
+            Assert.assertNotNull(doc);
+            Assert.assertNotNull(doc.toString() + "\t" + 
doc.keySet().toString(), doc.get("msg"));
+            Assert.assertFalse(doc == old);
+            old = doc;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script
index 69671b6..8cf4c97 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script
@@ -15,18 +15,18 @@
 #create mapping
 PUT:messages/:{ "mappings":{"message":{ "properties":{ 
"msg":{"type":"keyword"}}}}}
 #add document
-PUT:messages/message/1:{ "msg ":"one" }
-PUT:messages/message/2:{ "msg ":"two" }
-PUT:messages/message/3:{ "msg ":"two" }
-PUT:messages/message/4:{ "msg ":"three" }
-PUT:messages/message/5:{ "msg ":"three" }
-PUT:messages/message/6:{ "msg ":"three" }
-PUT:messages/message/7:{ "msg ":"four" }
-PUT:messages/message/8:{ "msg ":"four" }
-PUT:messages/message/9:{ "msg ":"four" }
-PUT:messages/message/10:{ "msg ":"four" }
-PUT:messages/message/11:{ "msg ":"five" }
-PUT:messages/message/12:{ "msg ":"five" }
-PUT:messages/message/13:{ "msg ":"five" }
-PUT:messages/message/14:{ "msg ":"five" }
-PUT:messages/message/15:{ "msg ":"five" }
\ No newline at end of file
+PUT:messages/message/1:{ "msg":"one" }
+PUT:messages/message/2:{ "msg":"two" }
+PUT:messages/message/3:{ "msg":"two" }
+PUT:messages/message/4:{ "msg":"three" }
+PUT:messages/message/5:{ "msg":"three" }
+PUT:messages/message/6:{ "msg":"three" }
+PUT:messages/message/7:{ "msg":"four" }
+PUT:messages/message/8:{ "msg":"four" }
+PUT:messages/message/9:{ "msg":"four" }
+PUT:messages/message/10:{ "msg":"four" }
+PUT:messages/message/11:{ "msg":"five" }
+PUT:messages/message/12:{ "msg":"five" }
+PUT:messages/message/13:{ "msg":"five" }
+PUT:messages/message/14:{ "msg":"five" }
+PUT:messages/message/15:{ "msg":"five" }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
new file mode 100644
index 0000000..609d8ed
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.DeleteOperationResponse;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@CapabilityDescription("Delete from an ElasticSearch index using a query. The 
query can be loaded from a flowfile body " +
+        "or from the Query parameter.")
+@Tags({ "elastic", "elasticsearch", "delete", "query"})
+@WritesAttributes({
+    @WritesAttribute(attribute = "elasticsearch.delete.took", description = 
"The amount of time that it took to complete the delete operation in ms."),
+    @WritesAttribute(attribute = "elasticsearch.delete.error", description = 
"The error message provided by ElasticSearch if there is an error running the 
delete.")
+})
+public class DeleteByQueryElasticsearch extends AbstractProcessor implements 
ElasticSearchRestProcessor {
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+        .description("If the delete by query fails, and a flowfile was read, 
it will be sent to this relationship.").build();
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+        .description("If the delete by query succeeds, and a flowfile was 
read, it will be sent to this relationship.")
+        .build();
+
+
+    static final String TOOK_ATTRIBUTE = "elasticsearch.delete.took";
+    static final String ERROR_ATTRIBUTE = "elasticsearch.delete.error";
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    private volatile ElasticSearchClientService clientService;
+
+    static {
+        final Set<Relationship> _rels = new HashSet<>();
+        _rels.add(REL_SUCCESS);
+        _rels.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final String query = getQuery(input, context, session);
+            final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+            final String type  = context.getProperty(TYPE).isSet()
+                    ? 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue()
+                    : null;
+            final String queryAttr = 
context.getProperty(QUERY_ATTRIBUTE).isSet()
+                    ? 
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                    : null;
+            DeleteOperationResponse dor = clientService.deleteByQuery(query, 
index, type);
+
+            if (input == null) {
+                input = session.create();
+            }
+
+            Map<String, String> attrs = new HashMap<>();
+            attrs.put(TOOK_ATTRIBUTE, String.valueOf(dor.getTook()));
+            if (!StringUtils.isBlank(queryAttr)) {
+                attrs.put(queryAttr, query);
+            }
+
+            input = session.putAllAttributes(input, attrs);
+
+            session.transfer(input, REL_SUCCESS);
+        } catch (Exception e) {
+            if (input != null) {
+                input = session.putAttribute(input, ERROR_ATTRIBUTE, 
e.getMessage());
+                session.transfer(input, REL_FAILURE);
+            }
+            getLogger().error("Error running delete by query: ", e);
+            context.yield();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java
new file mode 100644
index 0000000..e94a1cc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public interface ElasticSearchRestProcessor {
+    PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("el-rest-fetch-index")
+            .displayName("Index")
+            .description("The name of the index to use.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
+            .name("el-rest-type")
+            .displayName("Type")
+            .description("The type of this document (used by Elasticsearch for 
indexing and searching)")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+            .name("el-rest-query")
+            .displayName("Query")
+            .description("A query in JSON syntax, not Lucene syntax. Ex: 
{\"query\":{\"match\":{\"somefield\":\"somevalue\"}}}. " +
+                    "If this parameter is not set, the query will be read from 
the flowfile content.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("el-query-attribute")
+            .displayName("Query Attribute")
+            .description("If set, the executed query will be set on each 
result flowfile in the specified attribute.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .required(false)
+            .build();
+
+    PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+            .name("el-rest-client-service")
+            .displayName("Client Service")
+            .description("An ElasticSearch client service to use for running 
queries.")
+            .identifiesControllerService(ElasticSearchClientService.class)
+            .required(true)
+            .build();
+
+    default String getQuery(FlowFile input, ProcessContext context, 
ProcessSession session) throws IOException {
+        String retVal = null;
+        if (context.getProperty(QUERY).isSet()) {
+            retVal = 
context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+        } else if (input != null) {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            session.exportTo(input, out);
+            out.close();
+
+            retVal = new String(out.toByteArray());
+        }
+
+        return retVal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
index 64be962..40d6118 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
@@ -27,7 +27,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.Validator;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
 import org.apache.nifi.elasticsearch.SearchResponse;
 import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -38,10 +37,8 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StringUtils;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -62,7 +59,7 @@ import java.util.Set;
         "ElasticSearch JSON DSL. It does not automatically paginate queries 
for the user. If an incoming relationship is added to this " +
         "processor, it will use the flowfile's content for the query. Care 
should be taken on the size of the query because the entire response " +
         "from ElasticSearch will be loaded into memory all at once and 
converted into the resulting flowfiles.")
-public class JsonQueryElasticsearch extends AbstractProcessor {
+public class JsonQueryElasticsearch extends AbstractProcessor implements 
ElasticSearchRestProcessor {
     public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
             .description("All original flowfiles that don't cause an error to 
occur go to this relationship. " +
                     "This applies even if you select the \"split up hits\" 
option to send individual hits to the " +
@@ -78,49 +75,6 @@ public class JsonQueryElasticsearch extends 
AbstractProcessor {
     public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
             .description("Aggregations are routed to this relationship.")
             .build();
-
-    public static final PropertyDescriptor INDEX = new 
PropertyDescriptor.Builder()
-            .name("el-rest-fetch-index")
-            .displayName("Index")
-            .description("The name of the index to read from")
-            .required(true)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor TYPE = new 
PropertyDescriptor.Builder()
-            .name("el-rest-type")
-            .displayName("Type")
-            .description("The type of this document (used by Elasticsearch for 
indexing and searching)")
-            .required(false)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
-            .name("el-rest-query")
-            .displayName("Query")
-            .description("A query in JSON syntax, not Lucene syntax. Ex: " +
-                    "{\n" +
-                    "\t\"query\": {\n" +
-                    "\t\t\"match\": {\n" +
-                    "\t\t\t\"name\": \"John Smith\"\n" +
-                    "\t\t}\n" +
-                    "\t}\n" +
-                    "}")
-            .required(false)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor QUERY_ATTRIBUTE = new 
PropertyDescriptor.Builder()
-            .name("el-query-attribute")
-            .displayName("Query Attribute")
-            .description("If set, the executed query will be set on each 
result flowfile in the specified attribute.")
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(Validator.VALID)
-            .required(false)
-            .build();
-
     public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
         "splitUp-yes",
         "Yes",
@@ -151,14 +105,6 @@ public class JsonQueryElasticsearch extends 
AbstractProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
 
-    public static final PropertyDescriptor CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("el-rest-client-service")
-            .displayName("Client Service")
-            .description("An ElasticSearch client service to use for running 
queries.")
-            .identifiesControllerService(ElasticSearchClientService.class)
-            .required(true)
-            .build();
-
     private static final Set<Relationship> relationships;
     private static final List<PropertyDescriptor> propertyDescriptors;
 
@@ -207,21 +153,6 @@ public class JsonQueryElasticsearch extends 
AbstractProcessor {
 
     private final ObjectMapper mapper = new ObjectMapper();
 
-    private String getQuery(FlowFile input, ProcessContext context, 
ProcessSession session) throws IOException {
-        String retVal = null;
-        if (context.getProperty(QUERY).isSet()) {
-            retVal = 
context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
-        } else if (input != null) {
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            session.exportTo(input, out);
-            out.close();
-
-            retVal = new String(out.toByteArray());
-        }
-
-        return retVal;
-    }
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         FlowFile input = null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d57ff36..ac8c915 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,4 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch
 org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java
new file mode 100644
index 0000000..ade1102
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DeleteByQueryElasticsearchTest {
+    private static final String INDEX = "test_idx";
+    private static final String TYPE  = "test_type";
+    private static final String QUERY_ATTR = "es.delete.query";
+    private static final String CLIENT_NAME = "clientService";
+
+    private TestElasticSearchClientService client;
+
+    private void initClient(TestRunner runner) throws Exception {
+        client = new TestElasticSearchClientService(true);
+        runner.addControllerService(CLIENT_NAME, client);
+        runner.enableControllerService(client);
+        runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, 
CLIENT_NAME);
+    }
+
+    private void postTest(TestRunner runner, String queryParam) {
+        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0);
+        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS);
+        String attr = 
flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE);
+        String query = flowFiles.get(0).getAttribute(QUERY_ATTR);
+        Assert.assertNotNull(attr);
+        Assert.assertEquals(attr, "100");
+        Assert.assertNotNull(query);
+        Assert.assertEquals(queryParam, query);
+    }
+
+    @Test
+    public void testWithFlowfileInput() throws Exception {
+        String query = "{ \"query\": { \"match_all\": {} }}";
+        TestRunner runner = 
TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
+        runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
+        runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, 
QUERY_ATTR);
+        initClient(runner);
+        runner.assertValid();
+        runner.enqueue(query);
+        runner.run();
+
+        postTest(runner, query);
+    }
+
+    @Test
+    public void testWithQuery() throws Exception {
+        String query = "{\n" +
+            "\t\"query\": {\n" +
+            "\t\t\"match\": {\n" +
+            "\t\t\t\"${field.name}.keyword\": \"test\"\n" +
+            "\t\t}\n" +
+            "\t}\n" +
+            "}";
+        Map<String, String> attrs = new HashMap<String, String>(){{
+            put("field.name", "test_field");
+        }};
+        TestRunner runner = 
TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
+        runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
+        runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, 
QUERY_ATTR);
+        initClient(runner);
+        runner.assertValid();
+        runner.enqueue("", attrs);
+        runner.run();
+
+        postTest(runner, query.replace("${field.name}", "test_field"));
+
+        runner.clearTransferState();
+
+        query = "{\n" +
+            "\t\"query\": {\n" +
+            "\t\t\"match\": {\n" +
+            "\t\t\t\"test_field.keyword\": \"test\"\n" +
+            "\t\t}\n" +
+            "\t}\n" +
+            "}";
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
+        runner.setIncomingConnection(false);
+        runner.assertValid();
+        runner.run();
+        postTest(runner, query);
+    }
+
+    @Test
+    public void testErrorAttribute() throws Exception {
+        String query = "{ \"query\": { \"match_all\": {} }}";
+        TestRunner runner = 
TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
+        runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
+        runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, 
QUERY_ATTR);
+        initClient(runner);
+        client.setThrowErrorInDelete(true);
+        runner.assertValid();
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0);
+        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1);
+
+        MockFlowFile mockFlowFile = 
runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0);
+        String attr = 
mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE);
+        Assert.assertNotNull(attr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a29c1e4/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java
index 445a093..b1d4220 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java
@@ -19,25 +19,66 @@ package org.apache.nifi.processors.elasticsearch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.elasticsearch.DeleteOperationResponse;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.elasticsearch.IndexOperationResponse;
 import org.apache.nifi.elasticsearch.SearchResponse;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class TestElasticSearchClientService extends AbstractControllerService 
implements ElasticSearchClientService {
     private boolean returnAggs;
     private boolean throwErrorInSearch;
+    private boolean throwErrorInDelete;
 
     public TestElasticSearchClientService(boolean returnAggs) {
         this.returnAggs = returnAggs;
     }
 
     @Override
+    public IndexOperationResponse add(IndexOperationRequest operation) throws 
IOException {
+        return add(Arrays.asList(operation));
+    }
+
+    @Override
+    public IndexOperationResponse add(List<IndexOperationRequest> operations) 
throws IOException {
+        return new IndexOperationResponse(100L, 100L);
+    }
+
+    @Override
+    public DeleteOperationResponse deleteById(String index, String type, 
String id) throws IOException {
+        return deleteById(index, type, Arrays.asList(id));
+    }
+
+    @Override
+    public DeleteOperationResponse deleteById(String index, String type, 
List<String> ids) throws IOException {
+        if (throwErrorInDelete) {
+            throw new IOException("Simulated IOException");
+        }
+        return new DeleteOperationResponse(100L);
+    }
+
+    @Override
+    public DeleteOperationResponse deleteByQuery(String query, String index, 
String type) throws IOException {
+        return deleteById(index, type, Arrays.asList("1"));
+    }
+
+    @Override
+    public Map<String, Object> get(String index, String type, String id) 
throws IOException {
+        return new HashMap<String, Object>(){{
+            put("msg", "one");
+        }};
+    }
+
+    @Override
     public SearchResponse search(String query, String index, String type) 
throws IOException {
         if (throwErrorInSearch) {
-            throw new IOException();
+            throw new IOException("Simulated IOException");
         }
 
         ObjectMapper mapper = new ObjectMapper();
@@ -203,4 +244,8 @@ public class TestElasticSearchClientService extends 
AbstractControllerService im
     public void setThrowErrorInSearch(boolean throwErrorInSearch) {
         this.throwErrorInSearch = throwErrorInSearch;
     }
+
+    public void setThrowErrorInDelete(boolean throwErrorInDelete) {
+        this.throwErrorInDelete = throwErrorInDelete;
+    }
 }

Reply via email to