This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1745d2a  - NIFI-9388: add GetElasticsearch to fetch Elasticsearch 
document using the ElasticsearchClientService - NIFI-9387: add Proxy capability 
to ElasticsearchClientService - NIFI-1576: allow GetElasticsearch to run 
without requiring FlowFile input
1745d2a is described below

commit 1745d2a88e3afc95fa91dda9ab23e8abe4a85ed1
Author: Chris Sampson <[email protected]>
AuthorDate: Thu Nov 18 22:06:46 2021 +0000

    - NIFI-9388: add GetElasticsearch to fetch Elasticsearch document using the 
ElasticsearchClientService
    - NIFI-9387: add Proxy capability to ElasticsearchClientService
    - NIFI-1576: allow GetElasticsearch to run without requiring FlowFile input
    
    Signed-off-by: Joe Gresock <[email protected]>
    
    This closes #5535.
---
 .../nifi-elasticsearch-client-service-api/pom.xml  |   6 +
 .../elasticsearch/ElasticSearchClientService.java  |   3 +
 ...earchError.java => ElasticsearchException.java} |  29 ++-
 .../nifi-elasticsearch-client-service/pom.xml      |  11 +-
 .../ElasticSearchClientServiceImpl.java            |  64 +++--
 .../ElasticSearchStringLookupService.java          |   4 +-
 .../ElasticSearchClientService_IT.groovy           |  40 +--
 .../AbstractByQueryElasticsearch.java              |  13 +-
 .../AbstractJsonQueryElasticsearch.java            |  13 +-
 .../elasticsearch/DeleteByQueryElasticsearch.java  |   2 +-
 .../processors/elasticsearch/GetElasticsearch.java | 209 ++++++++++++++++
 .../elasticsearch/JsonQueryElasticsearch.java      |   5 +-
 .../PaginatedJsonQueryElasticsearch.java           |   7 +-
 .../elasticsearch/PutElasticsearchRecord.java      |  20 +-
 .../elasticsearch/SearchElasticsearch.java         |   7 +-
 .../elasticsearch/UpdateByQueryElasticsearch.java  |   2 +-
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../AbstractJsonQueryElasticsearchTest.groovy      |   2 +-
 ...tractPaginatedJsonQueryElasticsearchTest.groovy |   6 +-
 .../elasticsearch/GetElasticsearchTest.groovy      | 272 +++++++++++++++++++++
 .../elasticsearch/SearchElasticsearchTest.groovy   |   2 +-
 .../TestElasticsearchClientService.groovy          |  22 +-
 .../mock/MockBulkLoadClientService.groovy          |   4 +-
 ...or.groovy => MockElasticsearchException.groovy} |  11 +-
 24 files changed, 680 insertions(+), 75 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
index 6a649ce..9c7160c 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
@@ -44,6 +44,12 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-proxy-configuration-api</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
             <version>${jackson.version}</version>
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index d186306..138d0a9 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -24,6 +24,8 @@ import org.apache.nifi.components.Validator;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
 import org.apache.nifi.ssl.SSLContextService;
 
 import java.util.List;
@@ -49,6 +51,7 @@ public interface ElasticSearchClientService extends 
ControllerService {
             .identifiesControllerService(SSLContextService.class)
             .addValidator(Validator.VALID)
             .build();
+    PropertyDescriptor PROXY_CONFIGURATION_SERVICE = 
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP);
     PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
             .name("el-cs-username")
             .displayName("Username")
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchException.java
similarity index 69%
rename from 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
rename to 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchException.java
index 65e11ad..9c4d259 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchException.java
@@ -20,7 +20,7 @@ package org.apache.nifi.elasticsearch;
 import java.util.HashSet;
 import java.util.Set;
 
-public class ElasticsearchError extends RuntimeException {
+public class ElasticsearchException extends RuntimeException {
     /**
      * These are names of common Elasticsearch exceptions where it is safe to 
assume
      * that it's OK to retry the operation instead of just sending it to an 
error relationship.
@@ -32,17 +32,32 @@ public class ElasticsearchError extends RuntimeException {
         add("NodeClosedException");
     }};
 
-    protected boolean isElastic;
+    protected boolean elastic;
 
-    public ElasticsearchError(final Exception ex) {
+    protected boolean notFound;
+
+    public ElasticsearchException(final Exception ex) {
         super(ex);
+
         final boolean isKnownException = 
ELASTIC_ERROR_NAMES.contains(ex.getClass().getSimpleName());
-        final boolean isServiceUnavailable = 
"ResponseException".equals(ex.getClass().getSimpleName())
-                && ex.getMessage().contains("503 Service Unavailable");
-        isElastic = isKnownException || isServiceUnavailable;
+
+        final boolean isServiceUnavailable;
+        if ("ResponseException".equals(ex.getClass().getSimpleName())) {
+            isServiceUnavailable = ex.getMessage().contains("503 Service 
Unavailable");
+
+            notFound = ex.getMessage().contains("404 Not Found");
+        } else {
+            isServiceUnavailable = false;
+        }
+
+        elastic = isKnownException || isServiceUnavailable;
     }
 
     public boolean isElastic() {
-        return isElastic;
+        return elastic;
+    }
+
+    public boolean isNotFound() {
+        return notFound;
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index ae38810..796ac21 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -76,6 +76,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-proxy-configuration-api</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-elasticsearch-client-service-api</artifactId>
             <version>1.16.0-SNAPSHOT</version>
             <scope>provided</scope>
@@ -143,9 +149,10 @@
              to an Elastic-provided Elasticsearch instead of an instance 
provided by someone else (e.g. AWS OpenSearch)
              see: 
https://opensearch.org/blog/community/2021/08/community-clients/ for more info.
 
-             Note: the low-level elasticsearch-rest-client remains licensed 
with Apache 2.0 even after the move
+             Note: the low-level elasticsearch-rest-client remains licensed 
with Apache 2.0
+             
(https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_license.html)
 even after the move
              of the main Elasticsearch product and 
elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
-            <version>7.10.2</version>
+            <version>7.13.4</version>
             <scope>compile</scope>
             <exclusions>
                 <exclusion>
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 6575050..202c873 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -36,6 +36,8 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StopWatch;
@@ -50,6 +52,7 @@ import javax.net.ssl.SSLContext;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.MalformedURLException;
+import java.net.Proxy;
 import java.net.URL;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -79,6 +82,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         props.add(ElasticSearchClientService.USERNAME);
         props.add(ElasticSearchClientService.PASSWORD);
         props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
+        props.add(ElasticSearchClientService.PROXY_CONFIGURATION_SERVICE);
         props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
         props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
         props.add(ElasticSearchClientService.RETRY_TIMEOUT);
@@ -129,6 +133,8 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         final Integer connectTimeout = 
context.getProperty(CONNECT_TIMEOUT).asInteger();
         final Integer readTimeout    = 
context.getProperty(SOCKET_TIMEOUT).asInteger();
 
+        final ProxyConfigurationService proxyConfigurationService = 
context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
+
         final HttpHost[] hh = new HttpHost[hostsSplit.length];
         for (int x = 0; x < hh.length; x++) {
             final URL u = new URL(hostsSplit[x]);
@@ -150,10 +156,22 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
                         httpClientBuilder.setSSLContext(sslContext);
                     }
 
+                    CredentialsProvider credentialsProvider = null;
                     if (username != null && password != null) {
-                        final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
-                        credentialsProvider.setCredentials(AuthScope.ANY,
-                                new UsernamePasswordCredentials(username, 
password));
+                        credentialsProvider = addCredentials(null, 
AuthScope.ANY, username, password);
+                    }
+
+                    if (proxyConfigurationService != null) {
+                        final ProxyConfiguration proxyConfiguration = 
proxyConfigurationService.getConfiguration();
+                        if (Proxy.Type.HTTP == 
proxyConfiguration.getProxyType()) {
+                            final HttpHost proxy = new 
HttpHost(proxyConfiguration.getProxyServerHost(), 
proxyConfiguration.getProxyServerPort(), "http");
+                            httpClientBuilder.setProxy(proxy);
+
+                            credentialsProvider = 
addCredentials(credentialsProvider, new AuthScope(proxy), 
proxyConfiguration.getProxyUserName(), 
proxyConfiguration.getProxyUserPassword());
+                        }
+                    }
+
+                    if (credentialsProvider != null) {
                         
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                     }
 
@@ -168,6 +186,19 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         this.client = builder.build();
     }
 
+    private CredentialsProvider addCredentials(final CredentialsProvider 
credentialsProvider, final AuthScope authScope, final String username, final 
String password) {
+        final CredentialsProvider cp = credentialsProvider != null ? 
credentialsProvider : new BasicCredentialsProvider();
+
+        if (StringUtils.isNotBlank(username) && 
StringUtils.isNotBlank(password)) {
+            cp.setCredentials(
+                    authScope == null ? AuthScope.ANY : authScope,
+                    new UsernamePasswordCredentials(username, password)
+            );
+        }
+
+        return cp;
+    }
+
     private Response runQuery(final String endpoint, final String query, final 
String index, final String type, final Map<String, String> requestParameters) {
         final StringBuilder sb = new StringBuilder();
         if (StringUtils.isNotBlank(index)) {
@@ -183,7 +214,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         try {
             return performRequest("POST", sb.toString(), requestParameters, 
queryEntity);
         } catch (final Exception e) {
-            throw new ElasticsearchError(e);
+            throw new ElasticsearchException(e);
         }
     }
 
@@ -203,7 +234,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
                 throw new IOException(errorMessage);
             }
         } catch (final Exception ex) {
-            throw new ElasticsearchError(ex);
+            throw new ElasticsearchException(ex);
         }
     }
 
@@ -301,7 +332,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
 
             return IndexOperationResponse.fromJsonResponse(rawResponse);
         } catch (final Exception ex) {
-            throw new ElasticsearchError(ex);
+            throw new ElasticsearchException(ex);
         }
     }
 
@@ -340,7 +371,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             parseResponseWarningHeaders(response);
             return new 
DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
         } catch (final Exception ex) {
-            throw new RuntimeException(ex);
+            throw new ElasticsearchException(ex);
         }
     }
 
@@ -389,8 +420,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
 
             return (Map<String, Object>) mapper.readValue(body, 
Map.class).get("_source");
         } catch (final Exception ex) {
-            getLogger().error("", ex);
-            return null;
+            throw new ElasticsearchException(ex);
         }
     }
 
@@ -414,7 +444,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             final Response response = runQuery("_search", query, index, type, 
requestParameters);
             return buildSearchResponse(response);
         } catch (final Exception ex) {
-            throw new RuntimeException(ex);
+            throw new ElasticsearchException(ex);
         }
     }
 
@@ -425,7 +455,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             final Response response = performRequest("POST", 
"/_search/scroll", Collections.emptyMap(), scrollEntity);
             return buildSearchResponse(response);
         } catch (final Exception ex) {
-            throw new RuntimeException(ex);
+            throw new ElasticsearchException(ex);
         }
     }
 
@@ -447,7 +477,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
 
             return (String) mapper.readValue(body, Map.class).get("id");
         } catch (final Exception ex) {
-            throw new RuntimeException(ex);
+            throw new ElasticsearchException(ex);
         }
     }
 
@@ -472,11 +502,10 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             if (404 == re.getResponse().getStatusLine().getStatusCode()) {
                 getLogger().debug("Point in Time {} not found in Elasticsearch 
for deletion, ignoring", pitId);
                 return new DeleteOperationResponse(0);
-            } else {
-                throw new RuntimeException(re);
             }
+            throw new ElasticsearchException(re);
         } catch (final Exception ex) {
-            throw new RuntimeException(ex);
+            throw new ElasticsearchException(ex);
         }
     }
 
@@ -501,11 +530,10 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             if (404 == re.getResponse().getStatusLine().getStatusCode()) {
                 getLogger().debug("Scroll Id {} not found in Elasticsearch for 
deletion, ignoring", scrollId);
                 return new DeleteOperationResponse(0);
-            } else {
-                throw new RuntimeException(re);
             }
+            throw new ElasticsearchException(re);
         } catch (final Exception ex) {
-            throw new RuntimeException(ex);
+            throw new ElasticsearchException(ex);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java
index c408f5f..3be0aaa 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java
@@ -84,7 +84,7 @@ public class ElasticSearchStringLookupService extends 
AbstractControllerService
     }
 
     @Override
-    public Optional<String> lookup(Map<String, Object> coordinates) throws 
LookupFailureException {
+    public Optional<String> lookup(final Map<String, Object> coordinates) 
throws LookupFailureException {
         try {
             final String id = (String) coordinates.get(ID);
             final Map<String, Object> enums = esClient.get(index, type, id, 
null);
@@ -93,7 +93,7 @@ public class ElasticSearchStringLookupService extends 
AbstractControllerService
             } else {
                 return Optional.ofNullable(mapper.writeValueAsString(enums));
             }
-        } catch (IOException e) {
+        } catch (final IOException | ElasticsearchException e) {
             throw new LookupFailureException(e);
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy
index 9cb9885..4c03125 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy
@@ -22,6 +22,7 @@ import org.apache.maven.artifact.versioning.ComparableVersion
 import org.apache.nifi.elasticsearch.DeleteOperationResponse
 import org.apache.nifi.elasticsearch.ElasticSearchClientService
 import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
+import org.apache.nifi.elasticsearch.ElasticsearchException
 import org.apache.nifi.elasticsearch.IndexOperationRequest
 import org.apache.nifi.elasticsearch.IndexOperationResponse
 import org.apache.nifi.elasticsearch.SearchResponse
@@ -467,21 +468,24 @@ class ElasticSearchClientService_IT {
     void testDeleteById() throws Exception {
         final String ID = "1"
         final def originalDoc = service.get(INDEX, TYPE, ID, null)
-        DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID, 
null)
-        Assert.assertNotNull(response)
-        Assert.assertTrue(response.getTook() > 0)
-        def doc = service.get(INDEX, TYPE, ID, null)
-        Assert.assertNull(doc)
-        doc = service.get(INDEX, TYPE, "2", null)
-        Assert.assertNotNull(doc)
-
-        // replace the deleted doc
-        service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc, 
IndexOperationRequest.Operation.Index), null)
-        waitForIndexRefresh() // (affects later tests using _search or _bulk)
+        try {
+            DeleteOperationResponse response = service.deleteById(INDEX, TYPE, 
ID, null)
+            Assert.assertNotNull(response)
+            Assert.assertTrue(response.getTook() > 0)
+            final ElasticsearchException ee = 
Assert.assertThrows(ElasticsearchException.class, { ->
+                service.get(INDEX, TYPE, ID, null) })
+            Assert.assertTrue(ee.isNotFound())
+            final def doc = service.get(INDEX, TYPE, "2", null)
+            Assert.assertNotNull(doc)
+        } finally {
+            // replace the deleted doc
+            service.add(new IndexOperationRequest(INDEX, TYPE, "1", 
originalDoc, IndexOperationRequest.Operation.Index), null)
+            waitForIndexRefresh() // (affects later tests using _search or 
_bulk)
+        }
     }
 
     @Test
-    void testGet() throws IOException {
+    void testGet() {
         Map old
         1.upto(15) { index ->
             String id = String.valueOf(index)
@@ -493,6 +497,12 @@ class ElasticSearchClientService_IT {
     }
 
     @Test
+    void testGetNotFound() {
+        final ElasticsearchException ee = 
Assert.assertThrows(ElasticsearchException.class, { -> service.get(INDEX, TYPE, 
"not_found", null) })
+        Assert.assertTrue(ee.isNotFound())
+    }
+
+    @Test
     void testSSL() {
         final String serviceIdentifier = SSLContextService.class.getName()
         final SSLContextService sslContext = mock(SSLContextService.class)
@@ -671,8 +681,10 @@ class ElasticSearchClientService_IT {
         deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null, 
IndexOperationRequest.Operation.Delete))
         Assert.assertFalse(service.bulk(deletes, [refresh: 
"true"]).hasErrors())
         waitForIndexRefresh() // wait 1s for index refresh (doesn't prevent 
GET but affects later tests using _search or _bulk)
-        Assert.assertNull(service.get(INDEX, TYPE, TEST_ID, null))
-        Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID, null))
+        ElasticsearchException ee = 
Assert.assertThrows(ElasticsearchException.class, { -> service.get(INDEX, TYPE, 
TEST_ID, null) })
+        Assert.assertTrue(ee.isNotFound())
+        ee = Assert.assertThrows(ElasticsearchException.class, { -> 
service.get(INDEX, TYPE, UPSERTED_ID, null) })
+        Assert.assertTrue(ee.isNotFound())
     }
 
     @Test
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
index 90bd461..2336a9c 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.elasticsearch;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
 import org.apache.nifi.elasticsearch.OperationResponse;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -56,6 +57,7 @@ public abstract class AbstractByQueryElasticsearch extends 
AbstractProcessor imp
         final Set<Relationship> rels = new HashSet<>();
         rels.add(REL_SUCCESS);
         rels.add(REL_FAILURE);
+        rels.add(REL_RETRY);
         relationships = Collections.unmodifiableSet(rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
@@ -138,12 +140,21 @@ public abstract class AbstractByQueryElasticsearch 
extends AbstractProcessor imp
             input = session.putAllAttributes(input, attrs);
 
             session.transfer(input, REL_SUCCESS);
+        } catch (final ElasticsearchException ese) {
+            final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
+                    ese.isElastic() ? "Routing to retry." : "Routing to 
failure");
+            getLogger().error(msg, ese);
+            if (input != null) {
+                session.penalize(input);
+                input = session.putAttribute(input, getErrorAttribute(), 
ese.getMessage());
+                session.transfer(input, ese.isElastic() ? REL_RETRY : 
REL_FAILURE);
+            }
         } catch (final Exception e) {
+            getLogger().error("Error running \"by query\" operation: ", e);
             if (input != null) {
                 input = session.putAttribute(input, getErrorAttribute(), 
e.getMessage());
                 session.transfer(input, REL_FAILURE);
             }
-            getLogger().error("Error running \"by query\" operation: ", e);
             context.yield();
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index 7f1bb31..8af8770 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -22,6 +22,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
 import org.apache.nifi.elasticsearch.SearchResponse;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -168,9 +169,19 @@ public abstract class AbstractJsonQueryElasticsearch<Q 
extends JsonQueryParamete
             final SearchResponse response = doQuery(queryJsonParameters, 
hitsFlowFiles, session, context, input, stopWatch);
 
             finishQuery(input, queryJsonParameters, session, context, 
response);
+        } catch (final ElasticsearchException ese) {
+            final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
+                    ese.isElastic() ? "Routing to retry." : "Routing to 
failure");
+            getLogger().error(msg, ese);
+            if (input != null) {
+                session.penalize(input);
+                input = session.putAttribute(input, 
"elasticsearch.query.error", ese.getMessage());
+                session.transfer(input, ese.isElastic() ? REL_RETRY : 
REL_FAILURE);
+            }
         } catch (Exception ex) {
-            getLogger().error("Error processing flowfile.", ex);
+            getLogger().error("Could not query documents.", ex);
             if (input != null) {
+                input = session.putAttribute(input, 
"elasticsearch.query.error", ex.getMessage());
                 session.transfer(input, REL_FAILURE);
             }
             context.yield();
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
index 18d5de8..c6fcbee 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
@@ -34,7 +34,7 @@ import java.util.Map;
         @WritesAttribute(attribute = "elasticsearch.delete.error", description 
= "The error message provided by Elasticsearch if there is an error running the 
delete.")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
-@Tags({ "elastic", "elasticsearch", "delete", "query"})
+@Tags({ "elastic", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "delete", "query"})
 @CapabilityDescription("Delete from an Elasticsearch index using a query. The 
query can be loaded from a flowfile body " +
         "or from the Query parameter.")
 @DynamicProperty(
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
new file mode 100644
index 0000000..e416281
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official 
Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory 
before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename 
attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The 
Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The 
Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = 
"The error message provided by Elasticsearch if there is an error fetching the 
document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+            "flowfile-content",
+            "FlowFile Content",
+            "Output the retrieved document as the FlowFile content."
+    );
+
+    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+            "flowfile-attribute",
+            "FlowFile Attribute",
+            "Output the retrieved document as a FlowFile attribute specified 
by the Attribute Name."
+    );
+
+    static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+            .name("get-es-id")
+            .displayName("Document Id")
+            .description("The _id of the document to retrieve.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
+            .name("get-es-destination")
+            .displayName("Destination")
+            .description("Indicates whether the retrieved document is written 
to the FlowFile content or a FlowFile attribute.")
+            .required(true)
+            .allowableValues(FLOWFILE_CONTENT, FLOWFILE_ATTRIBUTE)
+            .defaultValue(FLOWFILE_CONTENT.getValue())
+            .build();
+
+    static final PropertyDescriptor ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
+            .name("get-es-attribute-name")
+            .displayName("Attribute Name")
+            .description("The name of the FlowFile attribute to use for the 
retrieved document output.")
+            .required(true)
+            .defaultValue("elasticsearch.doc")
+            .dependsOn(DESTINATION, FLOWFILE_ATTRIBUTE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final Relationship REL_DOC = new 
Relationship.Builder().name("document")
+            .description("Fetched documents are routed to this relationship.")
+            .build();
+
+    static final Relationship REL_NOT_FOUND = new 
Relationship.Builder().name("not_found")
+            .description("A FlowFile is routed to this relationship if the 
specified document does not exist in the Elasticsearch cluster.")
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE
+    ));
+    static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND
+    )));
+
+    private volatile ElasticSearchClientService clientService;
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = session.get();
+
+        final String id = 
context.getProperty(ID).evaluateAttributeExpressions(input).getValue();
+        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type  = 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+
+        final String destination = context.getProperty(DESTINATION).getValue();
+        final String attributeName = 
context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
+
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            final Map<String, Object> doc = clientService.get(index, type, id, 
getUrlQueryParameters(context, input));
+
+            final Map<String, String> attributes = new HashMap<>(4, 1);
+            attributes.put("filename", id);
+            attributes.put("elasticsearch.index", index);
+            if (type != null) {
+                attributes.put("elasticsearch.type", type);
+            }
+            final String json = mapper.writeValueAsString(doc);
+            FlowFile documentFlowFile = input != null ? input : 
session.create();
+            if (FLOWFILE_CONTENT.getValue().equals(destination)) {
+                documentFlowFile = session.write(documentFlowFile, out -> 
out.write(json.getBytes()));
+            } else {
+                attributes.put(attributeName, json);
+            }
+
+            documentFlowFile = session.putAllAttributes(documentFlowFile, 
attributes);
+            session.getProvenanceReporter().receive(documentFlowFile, 
clientService.getTransitUrl(index, type), 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(documentFlowFile, REL_DOC);
+        } catch (final ElasticsearchException ese) {
+            if (ese.isNotFound()) {
+                if (input != null) {
+                    session.transfer(input, REL_NOT_FOUND);
+                } else {
+                    getLogger().warn("Document with _id {} not found in index 
{} (and type {})", id, index, type);
+                }
+            } else {
+                final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
+                        ese.isElastic() ? "Routing to retry." : "Routing to 
failure");
+                getLogger().error(msg, ese);
+                if (input != null) {
+                    session.penalize(input);
+                    input = session.putAttribute(input, 
"elasticsearch.get.error", ese.getMessage());
+                    session.transfer(input, ese.isElastic() ? REL_RETRY : 
REL_FAILURE);
+                }
+            }
+        } catch (final Exception ex) {
+            getLogger().error("Could not fetch document.", ex);
+            if (input != null) {
+                input = session.putAttribute(input, "elasticsearch.get.error", 
ex.getMessage());
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
index a460c3a..a0bce42 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
@@ -39,11 +39,12 @@ import java.util.concurrent.TimeUnit;
     @WritesAttribute(attribute = "mime.type", description = 
"application/json"),
     @WritesAttribute(attribute = "aggregation.name", description = "The name 
of the aggregation whose results are in the output flowfile"),
     @WritesAttribute(attribute = "aggregation.number", description = "The 
number of the aggregation whose results are in the output flowfile"),
-    @WritesAttribute(attribute = "hit.count", description = "The number of 
hits that are in the output flowfile")
+    @WritesAttribute(attribute = "hit.count", description = "The number of 
hits that are in the output flowfile"),
+    @WritesAttribute(attribute = "elasticsearch.query.error", description = 
"The error message provided by Elasticsearch if there is an error querying the 
index.")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
 @EventDriven
-@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
+@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", 
"query", "read", "get", "json"})
 @CapabilityDescription("A processor that allows the user to run a query (with 
aggregations) written with the " +
         "Elasticsearch JSON DSL. It does not automatically paginate queries 
for the user. If an incoming relationship is added to this " +
         "processor, it will use the flowfile's content for the query. Care 
should be taken on the size of the query because the entire response " +
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
index 701d43c..f2384b6 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
@@ -42,11 +42,12 @@ import java.util.List;
     @WritesAttribute(attribute = "aggregation.name", description = "The name 
of the aggregation whose results are in the output flowfile"),
     @WritesAttribute(attribute = "aggregation.number", description = "The 
number of the aggregation whose results are in the output flowfile"),
     @WritesAttribute(attribute = "page.number", description = "The number of 
the page (request) in which the results were returned that are in the output 
flowfile"),
-    @WritesAttribute(attribute = "hit.count", description = "The number of 
hits that are in the output flowfile")
+    @WritesAttribute(attribute = "hit.count", description = "The number of 
hits that are in the output flowfile"),
+    @WritesAttribute(attribute = "elasticsearch.query.error", description = 
"The error message provided by Elasticsearch if there is an error querying the 
index.")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @EventDriven
-@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "read", 
"json"})
+@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", 
"query", "scroll", "page", "read", "json"})
 @CapabilityDescription("A processor that allows the user to run a paginated 
query (with aggregations) written with the Elasticsearch JSON DSL. " +
         "It will use the flowfile's content for the query unless the QUERY 
attribute is populated. " +
         "Search After/Point in Time queries must include a valid \"sort\" 
field.")
@@ -58,7 +59,7 @@ import java.util.List;
                 "These parameters will override any matching parameters in the 
query request body")
 @SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"Care should be taken on the size of each page because each response " +
         "from Elasticsearch will be loaded into memory all at once and 
converted into the resulting flowfiles.")
-public class PaginatedJsonQueryElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
+public class PaginatedJsonQueryElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch {
     private static final List<PropertyDescriptor> propertyDescriptors;
 
     static {
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index ec3c3f2..6fd1ddb 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -21,6 +21,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -30,7 +32,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.elasticsearch.ElasticsearchError;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
 import org.apache.nifi.elasticsearch.IndexOperationRequest;
 import org.apache.nifi.elasticsearch.IndexOperationResponse;
 import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -72,14 +74,17 @@ import java.util.Optional;
 import java.util.Set;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", 
"index", "record"})
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "put", "index", "record"})
 @CapabilityDescription("A record-aware Elasticsearch put processor that uses 
the official Elastic REST client libraries.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "elasticsearch.put.error", description = 
"The error message provided by Elasticsearch if there is an error indexing the 
documents.")
+})
 @DynamicProperty(
         name = "The name of a URL query parameter to add",
         value = "The value of the URL query parameter",
         expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
         description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing. " +
-                "These parameters will override any matching parameters in the 
query request body")
+                "These parameters will override any matching parameters in the 
_bulk request body")
 public class PutElasticsearchRecord extends AbstractProcessor implements 
ElasticsearchRestProcessor {
     static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
         .name("put-es-record-reader")
@@ -346,7 +351,7 @@ public class PutElasticsearchRecord extends 
AbstractProcessor implements Elastic
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final FlowFile input = session.get();
+        FlowFile input = session.get();
         if (input == null) {
             return;
         }
@@ -415,18 +420,21 @@ public class PutElasticsearchRecord extends 
AbstractProcessor implements Elastic
                     badRecords.add(bad);
                 }
             }
-        } catch (final ElasticsearchError ese) {
+        } catch (final ElasticsearchException ese) {
             final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
-                    ese.isElastic() ? "Moving to retry." : "Moving to 
failure");
+                    ese.isElastic() ? "Routing to retry." : "Routing to 
failure");
             getLogger().error(msg, ese);
             final Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
             session.penalize(input);
+            input = session.putAttribute(input, "elasticsearch.put.error", 
ese.getMessage());
             session.transfer(input, rel);
             removeBadRecordFlowFiles(badRecords, session);
             return;
         } catch (final Exception ex) {
             getLogger().error("Could not index documents.", ex);
+            input = session.putAttribute(input, "elasticsearch.put.error", 
ex.getMessage());
             session.transfer(input, REL_FAILURE);
+            context.yield();
             removeBadRecordFlowFiles(badRecords, session);
             return;
         }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
index 073f779..93adcfe 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
@@ -55,13 +55,14 @@ import java.util.Set;
     @WritesAttribute(attribute = "aggregation.name", description = "The name 
of the aggregation whose results are in the output flowfile"),
     @WritesAttribute(attribute = "aggregation.number", description = "The 
number of the aggregation whose results are in the output flowfile"),
     @WritesAttribute(attribute = "page.number", description = "The number of 
the page (request) in which the results were returned that are in the output 
flowfile"),
-    @WritesAttribute(attribute = "hit.count", description = "The number of 
hits that are in the output flowfile")
+    @WritesAttribute(attribute = "hit.count", description = "The number of 
hits that are in the output flowfile"),
+    @WritesAttribute(attribute = "elasticsearch.query.error", description = 
"The error message provided by Elasticsearch if there is an error querying the 
index.")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @TriggerSerially
 @PrimaryNodeOnly
 @DefaultSchedule(period="1 min")
-@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", 
"search", "json"})
+@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", 
"query", "scroll", "page", "search", "json"})
 @CapabilityDescription("A processor that allows the user to repeatedly run a 
paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
         "Search After/Point in Time queries must include a valid \"sort\" 
field. The processor will retrieve multiple pages of results " +
         "until either no more results are available or the Pagination Keep 
Alive expiration is reached, after which the query will " +
@@ -77,7 +78,7 @@ import java.util.Set;
         "(when the current time is later than the last query execution plus 
the Pagination Keep Alive interval).")
 @SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"Care should be taken on the size of each page because each response " +
         "from Elasticsearch will be loaded into memory all at once and 
converted into the resulting flowfiles.")
-public class SearchElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
+public class SearchElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch {
     static final String STATE_SCROLL_ID = "scrollId";
     static final String STATE_PIT_ID = "pitId";
     static final String STATE_SEARCH_AFTER = "searchAfter";
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java
index ba98025..a64baba 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java
@@ -34,7 +34,7 @@ import java.util.Map;
         @WritesAttribute(attribute = "elasticsearch.update.error", description 
= "The error message provided by Elasticsearch if there is an error running the 
update.")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
-@Tags({ "elastic", "elasticsearch", "update", "query"})
+@Tags({ "elastic", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "update", "query"})
 @CapabilityDescription("Update documents in an Elasticsearch index using a 
query. The query can be loaded from a flowfile body " +
         "or from the Query parameter.")
 @DynamicProperty(
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 749ba53..69bbf40 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -19,3 +19,4 @@ 
org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch
 org.apache.nifi.processors.elasticsearch.SearchElasticsearch
 org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord
 org.apache.nifi.processors.elasticsearch.UpdateByQueryElasticsearch
+org.apache.nifi.processors.elasticsearch.GetElasticsearch
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
index 51fdec1..3ae81a2 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
@@ -279,7 +279,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryEla
 
         runOnce(runner)
 
-        final TestElasticsearchClientService service = 
runner.getControllerService("esService") as TestElasticsearchClientService
+        final TestElasticsearchClientService service = getService(runner)
         if (getProcessor() instanceof SearchElasticsearch || getProcessor() 
instanceof PaginatedJsonQueryElasticsearch) {
             Assert.assertEquals(3, service.getRequestParameters().size())
             Assert.assertEquals("600s", 
service.getRequestParameters().get("scroll"))
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
index afffcb2..274e6a2 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
@@ -178,7 +178,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest 
extends AbstractJsonQ
         // check error was caught and logged
         assertThat(runner.getLogger().getErrorMessages().stream()
                 .anyMatch({ logMessage ->
-                    logMessage.getMsg().contains("Error processing flowfile") 
&&
+                    logMessage.getMsg().contains("Could not query documents") 
&&
                             logMessage.getThrowable().getMessage() == 
"Simulated IOException - initialisePointInTime"
                 }),
                 is(true)
@@ -199,7 +199,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest 
extends AbstractJsonQ
         // check error was caught and logged
         assertThat(runner.getLogger().getErrorMessages().stream()
                 .anyMatch({ logMessage ->
-                    logMessage.getMsg().contains("Error processing flowfile") 
&&
+                    logMessage.getMsg().contains("Could not query documents") 
&&
                             logMessage.getThrowable().getMessage() == "Query 
using pit/search_after must contain a \"sort\" field"
                 }),
                 is(true)
@@ -213,7 +213,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest 
extends AbstractJsonQ
         testCounts(runner, 0, 0, isInput() ? 1 : 0, 0)
         assertThat(runner.getLogger().getErrorMessages().stream()
                 .anyMatch({ logMessage ->
-                    logMessage.getMsg().contains("Error processing flowfile") 
&&
+                    logMessage.getMsg().contains("Could not query documents") 
&&
                             logMessage.getThrowable().getMessage() == "Query 
using pit/search_after must contain a \"sort\" field"
                 }),
                 is(true)
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
new file mode 100644
index 0000000..5ffe818
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.ProvenanceEventType
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.hamcrest.MatcherAssert
+import org.junit.Assert
+import org.junit.Test
+
+import static groovy.json.JsonOutput.toJson
+import static org.hamcrest.CoreMatchers.equalTo
+import static org.hamcrest.CoreMatchers.is
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.junit.Assert.assertThrows
+
+class GetElasticsearchTest {
+    static final String INDEX_NAME = "messages"
+
+    @Test
+    void testMandatoryProperties() {
+        final TestRunner runner = createRunner()
+        runner.removeProperty(GetElasticsearch.CLIENT_SERVICE)
+        runner.removeProperty(GetElasticsearch.INDEX)
+        runner.removeProperty(GetElasticsearch.TYPE)
+        runner.removeProperty(GetElasticsearch.ID)
+        runner.removeProperty(GetElasticsearch.DESTINATION)
+        runner.removeProperty(GetElasticsearch.ATTRIBUTE_NAME)
+
+        final AssertionError assertionError = 
assertThrows(AssertionError.class, runner.&run)
+        assertThat(assertionError.getMessage(), 
equalTo(String.format("Processor has 3 validation failures:\n" +
+                "'%s' is invalid because %s is required\n" +
+                "'%s' is invalid because %s is required\n" +
+                "'%s' is invalid because %s is required\n",
+                GetElasticsearch.ID.getDisplayName(), 
GetElasticsearch.ID.getDisplayName(),
+                GetElasticsearch.INDEX.getDisplayName(), 
GetElasticsearch.INDEX.getDisplayName(),
+                GetElasticsearch.CLIENT_SERVICE.getDisplayName(), 
GetElasticsearch.CLIENT_SERVICE.getDisplayName()
+        )))
+    }
+
+    @Test
+    void testInvalidProperties() {
+        final TestRunner runner = createRunner()
+        runner.setProperty(GetElasticsearch.CLIENT_SERVICE, "not-a-service")
+        runner.setProperty(GetElasticsearch.INDEX, "")
+        runner.setProperty(GetElasticsearch.TYPE, "")
+        runner.setProperty(GetElasticsearch.ID, "")
+        runner.setProperty(GetElasticsearch.DESTINATION, "not-valid")
+        runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME, "")
+
+        final AssertionError assertionError = 
assertThrows(AssertionError.class, runner.&run)
+        assertThat(assertionError.getMessage(), 
equalTo(String.format("Processor has 6 validation failures:\n" +
+                "'%s' validated against '' is invalid because %s cannot be 
empty\n" +
+                "'%s' validated against '' is invalid because %s cannot be 
empty\n" +
+                "'%s' validated against '' is invalid because %s cannot be 
empty\n" +
+                "'%s' validated against 'not-valid' is invalid because Given 
value not found in allowed set '%s'\n" +
+                "'%s' validated against 'not-a-service' is invalid because 
Property references a Controller Service that does not exist\n" +
+                "'%s' validated against 'not-a-service' is invalid because 
Invalid Controller Service: not-a-service is not a valid Controller Service 
Identifier\n",
+                GetElasticsearch.ID.getName(), GetElasticsearch.ID.getName(),
+                GetElasticsearch.INDEX.getName(), 
GetElasticsearch.INDEX.getName(),
+                GetElasticsearch.TYPE.getName(), 
GetElasticsearch.TYPE.getName(),
+                GetElasticsearch.DESTINATION.getName(), 
[GetElasticsearch.FLOWFILE_CONTENT.getValue(), 
GetElasticsearch.FLOWFILE_ATTRIBUTE.getValue()].join(", "),
+                GetElasticsearch.CLIENT_SERVICE.getDisplayName(),
+                GetElasticsearch.CLIENT_SERVICE.getDisplayName()
+        )))
+    }
+
+    @Test
+    void testInvalidAttributeName() {
+        final TestRunner runner = createRunner()
+        runner.setProperty(GetElasticsearch.DESTINATION, 
GetElasticsearch.FLOWFILE_ATTRIBUTE)
+        runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME, "")
+
+        final AssertionError assertionError = 
assertThrows(AssertionError.class, runner.&run)
+        assertThat(assertionError.getMessage(), 
equalTo(String.format("Processor has 1 validation failures:\n" +
+                "'%s' validated against '' is invalid because %s cannot be 
empty\n",
+                GetElasticsearch.ATTRIBUTE_NAME.getName(), 
GetElasticsearch.ATTRIBUTE_NAME.getName()
+        )))
+    }
+
+    @Test
+    void testFetch() throws Exception {
+        final TestRunner runner = createRunner()
+
+        runProcessor(runner)
+
+        testCounts(runner, 1, 0, 0, 0)
+        final FlowFile doc = 
runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
+        assertOutputContent(doc.getContent())
+        MatcherAssert.assertThat(
+                runner.getProvenanceEvents().stream().filter({ pe ->
+                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                            pe.getAttribute("uuid") == doc.getAttribute("uuid")
+                }).count(),
+                is(1L)
+        )
+    }
+
+    @Test
+    void testInputHandlingDestinationContent() {
+        final TestRunner runner = createRunner()
+        runner.setProperty(GetElasticsearch.DESTINATION, 
GetElasticsearch.FLOWFILE_CONTENT)
+
+        runner.setIncomingConnection(true)
+        runProcessor(runner)
+        testCounts(runner, 1, 0, 0, 0)
+        MockFlowFile doc = 
runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
+        assertOutputContent(doc.getContent())
+        assertCommonAttributes(doc)
+        assertOutputAttribute(doc)
+        reset(runner)
+
+        runner.setIncomingConnection(false)
+        runner.run()
+        testCounts(runner, 1, 0, 0, 0)
+        doc = 
runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
+        assertOutputContent(doc.getContent())
+        assertCommonAttributes(doc)
+        assertOutputAttribute(doc)
+    }
+
+    @Test
+    void testDestinationAttribute() {
+        final TestRunner runner = createRunner()
+        runner.setProperty(GetElasticsearch.DESTINATION, 
GetElasticsearch.FLOWFILE_ATTRIBUTE)
+
+        runner.setIncomingConnection(true)
+        runProcessor(runner)
+        testCounts(runner, 1, 0, 0, 0)
+        MockFlowFile doc = 
runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
+        assertThat(doc.getContent(), is("test"))
+        assertCommonAttributes(doc)
+        assertOutputAttribute(doc, true)
+        reset(runner)
+
+        // non-default attribute name and fetch without type
+        runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME, "my_attr")
+        runner.removeProperty(GetElasticsearch.TYPE)
+        runner.setIncomingConnection(false)
+        runner.run()
+        testCounts(runner, 1, 0, 0, 0)
+        doc = 
runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
+        assertThat(doc.getContent(), is(""))
+        assertCommonAttributes(doc, false)
+        assertOutputAttribute(doc, true, "my_attr")
+    }
+
+    @Test
+    void testErrorDuringFetch() throws Exception {
+        final TestRunner runner = createRunner()
+        getService(runner).setThrowErrorInGet(true)
+
+        runner.setIncomingConnection(true)
+        runProcessor(runner)
+        testCounts(runner, 0, 1, 0, 0)
+        reset(runner)
+
+        runner.setIncomingConnection(false)
+        runner.run()
+        testCounts(runner, 0, 0, 0, 0)
+    }
+
+    @Test
+    void testNotFound() throws Exception {
+        final TestRunner runner = createRunner()
+        getService(runner).setThrowNotFoundInGet(true)
+
+        runProcessor(runner)
+        testCounts(runner, 0, 0, 0, 1)
+        reset(runner)
+    }
+
+    @Test
+    void testRequestParameters() {
+        final TestRunner runner = createRunner()
+        runner.setProperty("refresh", "true")
+        runner.setProperty("_source", '${source}')
+        runner.setVariable("source", "msg")
+
+        runProcessor(runner)
+
+        final TestElasticsearchClientService service = getService(runner)
+        Assert.assertEquals(2, service.getRequestParameters().size())
+        Assert.assertEquals("true", 
service.getRequestParameters().get("refresh"))
+        Assert.assertEquals("msg", 
service.getRequestParameters().get("_source"))
+    }
+
+    private static void testCounts(final TestRunner runner, final int doc, 
final int failure, final int retry, final int notFound) {
+        runner.assertTransferCount(GetElasticsearch.REL_DOC, doc)
+        runner.assertTransferCount(GetElasticsearch.REL_FAILURE, failure)
+        runner.assertTransferCount(GetElasticsearch.REL_RETRY, retry)
+        runner.assertTransferCount(GetElasticsearch.REL_NOT_FOUND, notFound)
+    }
+
+    private static void assertOutputContent(final String content) {
+        assertThat(content, is(toJson(["msg": "one"])))
+    }
+
+    private static void assertOutputAttribute(final MockFlowFile doc, final 
boolean attributeOutput = false, final String attr = "elasticsearch.doc") {
+        if (attributeOutput) {
+            doc.assertAttributeEquals(attr, toJson(["msg": "one"]))
+        } else {
+            doc.assertAttributeNotExists(attr)
+        }
+    }
+
+    private static void assertCommonAttributes(final MockFlowFile doc, final 
boolean type = true, final boolean error = false) {
+        doc.assertAttributeEquals("filename", "doc_1")
+        doc.assertAttributeEquals("elasticsearch.index", INDEX_NAME)
+        if (type) {
+            doc.assertAttributeEquals("elasticsearch.type", "message")
+        } else {
+            doc.assertAttributeNotExists("elasticsearch.type")
+        }
+
+        if (error) {
+            doc.assertAttributeEquals("elasticsearch.get.error", "message")
+        } else {
+            doc.assertAttributeNotExists("elasticsearch.get.error")
+        }
+    }
+
+    private static TestRunner createRunner() {
+        final GetElasticsearch processor = new GetElasticsearch()
+        final TestRunner runner = TestRunners.newTestRunner(processor)
+        final TestElasticsearchClientService service = new 
TestElasticsearchClientService(false)
+        runner.addControllerService("esService", service)
+        runner.enableControllerService(service)
+        runner.setProperty(GetElasticsearch.CLIENT_SERVICE, "esService")
+        runner.setProperty(GetElasticsearch.INDEX, INDEX_NAME)
+        runner.setProperty(GetElasticsearch.TYPE, "message")
+        runner.setProperty(GetElasticsearch.ID, "doc_1")
+        runner.setProperty(GetElasticsearch.DESTINATION, 
GetElasticsearch.FLOWFILE_CONTENT)
+        runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME, 
"elasticsearch.doc")
+        runner.setValidateExpressionUsage(true)
+
+        return runner
+    }
+
+    private static MockFlowFile runProcessor(final TestRunner runner) {
+        final MockFlowFile ff = runner.enqueue("test")
+        runner.run()
+        return ff
+    }
+
+    private static TestElasticsearchClientService getService(final TestRunner 
runner) {
+        return runner.getControllerService("esService", 
TestElasticsearchClientService.class)
+    }
+
+    private static void reset(final TestRunner runner) {
+        runner.clearProvenanceEvents()
+        runner.clearTransferState()
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
index c606d36..5fc0647 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
@@ -64,7 +64,7 @@ class SearchElasticsearchTest extends 
AbstractPaginatedJsonQueryElasticsearchTes
         testCounts(runner, 0, 0, 0, 0)
         assertThat(runner.getLogger().getErrorMessages().stream()
                 .anyMatch({ logMessage ->
-                    logMessage.getMsg().contains("Error processing flowfile") 
&&
+                    logMessage.getMsg().contains("Could not query documents") 
&&
                             logMessage.getThrowable().getMessage() == 
"Simulated IOException - scroll"
                 }),
                 is(true)
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
index 9bd4cb6..7410380 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
@@ -21,14 +21,20 @@ import groovy.json.JsonSlurper
 import org.apache.nifi.controller.AbstractControllerService
 import org.apache.nifi.elasticsearch.DeleteOperationResponse
 import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.ElasticsearchException
 import org.apache.nifi.elasticsearch.IndexOperationRequest
 import org.apache.nifi.elasticsearch.IndexOperationResponse
 import org.apache.nifi.elasticsearch.SearchResponse
 import org.apache.nifi.elasticsearch.UpdateOperationResponse
+import org.apache.nifi.processors.elasticsearch.mock.MockElasticsearchException
+import org.elasticsearch.client.Response
+import org.elasticsearch.client.ResponseException
 
 class TestElasticsearchClientService extends AbstractControllerService 
implements ElasticSearchClientService {
     private boolean returnAggs
     private boolean throwErrorInSearch
+    private boolean throwErrorInGet
+    private boolean throwNotFoundInGet
     private boolean throwErrorInDelete
     private boolean throwErrorInPit
     private boolean throwErrorInUpdate
@@ -43,7 +49,11 @@ class TestElasticsearchClientService extends 
AbstractControllerService implement
 
     private void common(boolean throwError, Map<String, String> 
requestParameters) {
         if (throwError) {
-            throw new IOException("Simulated IOException")
+            if (throwNotFoundInGet) {
+                throw new MockElasticsearchException(false, true)
+            } else {
+                throw new IOException("Simulated IOException")
+            }
         }
         this.requestParameters = requestParameters
     }
@@ -89,7 +99,7 @@ class TestElasticsearchClientService extends 
AbstractControllerService implement
 
     @Override
     Map<String, Object> get(String index, String type, String id, Map<String, 
String> requestParameters) {
-        common(false, requestParameters)
+        common(throwErrorInGet || throwNotFoundInGet, requestParameters)
         return [ "msg": "one" ]
     }
 
@@ -298,6 +308,14 @@ class TestElasticsearchClientService extends 
AbstractControllerService implement
             "      }\n" +
             "    ]"
 
+    void setThrowNotFoundInGet(boolean throwNotFoundInGet) {
+        this.throwNotFoundInGet = throwNotFoundInGet
+    }
+
+    void setThrowErrorInGet(boolean throwErrorInGet) {
+        this.throwErrorInGet = throwErrorInGet
+    }
+
     void setThrowErrorInSearch(boolean throwErrorInSearch) {
         this.throwErrorInSearch = throwErrorInSearch
     }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
index 21a3a8f..7d63ec5 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
@@ -29,9 +29,9 @@ class MockBulkLoadClientService extends 
AbstractMockElasticsearchClient {
     @Override
     IndexOperationResponse bulk(List<IndexOperationRequest> items, Map<String, 
String> requestParameters) {
         if (throwRetriableError) {
-            throw new MockElasticsearchError(true)
+            throw new MockElasticsearchException(true, false)
         } else if (throwFatalError) {
-            throw new MockElasticsearchError(false)
+            throw new MockElasticsearchException(false, false)
         }
 
         if (evalClosure) {
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchException.groovy
similarity index 75%
rename from 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
rename to 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchException.groovy
index 7b1073b..90f8bce 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchException.groovy
@@ -17,15 +17,16 @@
 
 package org.apache.nifi.processors.elasticsearch.mock
 
-import org.apache.nifi.elasticsearch.ElasticsearchError
+import org.apache.nifi.elasticsearch.ElasticsearchException
 
-class MockElasticsearchError extends ElasticsearchError {
-    MockElasticsearchError(boolean isElastic) {
+class MockElasticsearchException extends ElasticsearchException {
+    MockElasticsearchException(boolean elastic, boolean notFound) {
         this(new Exception())
-        this.isElastic = isElastic
+        this.elastic = elastic
+        this.notFound = notFound
     }
 
-    MockElasticsearchError(Exception ex) {
+    MockElasticsearchException(Exception ex) {
         super(ex)
     }
 }

Reply via email to