This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1745d2a - NIFI-9388: add GetElasticsearch to fetch Elasticsearch
document using the ElasticsearchClientService - NIFI-9387: add Proxy capability
to ElasticsearchClientService - NIFI-1576: allow GetElasticsearch to run
without requiring FlowFile input
1745d2a is described below
commit 1745d2a88e3afc95fa91dda9ab23e8abe4a85ed1
Author: Chris Sampson <[email protected]>
AuthorDate: Thu Nov 18 22:06:46 2021 +0000
- NIFI-9388: add GetElasticsearch to fetch Elasticsearch document using the
ElasticsearchClientService
- NIFI-9387: add Proxy capability to ElasticsearchClientService
- NIFI-1576: allow GetElasticsearch to run without requiring FlowFile input
Signed-off-by: Joe Gresock <[email protected]>
This closes #5535.
---
.../nifi-elasticsearch-client-service-api/pom.xml | 6 +
.../elasticsearch/ElasticSearchClientService.java | 3 +
...earchError.java => ElasticsearchException.java} | 29 ++-
.../nifi-elasticsearch-client-service/pom.xml | 11 +-
.../ElasticSearchClientServiceImpl.java | 64 +++--
.../ElasticSearchStringLookupService.java | 4 +-
.../ElasticSearchClientService_IT.groovy | 40 +--
.../AbstractByQueryElasticsearch.java | 13 +-
.../AbstractJsonQueryElasticsearch.java | 13 +-
.../elasticsearch/DeleteByQueryElasticsearch.java | 2 +-
.../processors/elasticsearch/GetElasticsearch.java | 209 ++++++++++++++++
.../elasticsearch/JsonQueryElasticsearch.java | 5 +-
.../PaginatedJsonQueryElasticsearch.java | 7 +-
.../elasticsearch/PutElasticsearchRecord.java | 20 +-
.../elasticsearch/SearchElasticsearch.java | 7 +-
.../elasticsearch/UpdateByQueryElasticsearch.java | 2 +-
.../services/org.apache.nifi.processor.Processor | 1 +
.../AbstractJsonQueryElasticsearchTest.groovy | 2 +-
...tractPaginatedJsonQueryElasticsearchTest.groovy | 6 +-
.../elasticsearch/GetElasticsearchTest.groovy | 272 +++++++++++++++++++++
.../elasticsearch/SearchElasticsearchTest.groovy | 2 +-
.../TestElasticsearchClientService.groovy | 22 +-
.../mock/MockBulkLoadClientService.groovy | 4 +-
...or.groovy => MockElasticsearchException.groovy} | 11 +-
24 files changed, 680 insertions(+), 75 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
index 6a649ce..9c7160c 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
@@ -44,6 +44,12 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-proxy-configuration-api</artifactId>
+ <version>1.16.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index d186306..138d0a9 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -24,6 +24,8 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.ssl.SSLContextService;
import java.util.List;
@@ -49,6 +51,7 @@ public interface ElasticSearchClientService extends
ControllerService {
.identifiesControllerService(SSLContextService.class)
.addValidator(Validator.VALID)
.build();
+ PropertyDescriptor PROXY_CONFIGURATION_SERVICE =
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP);
PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("el-cs-username")
.displayName("Username")
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchException.java
similarity index 69%
rename from
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
rename to
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchException.java
index 65e11ad..9c4d259 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchException.java
@@ -20,7 +20,7 @@ package org.apache.nifi.elasticsearch;
import java.util.HashSet;
import java.util.Set;
-public class ElasticsearchError extends RuntimeException {
+public class ElasticsearchException extends RuntimeException {
/**
* These are names of common Elasticsearch exceptions where it is safe to
assume
* that it's OK to retry the operation instead of just sending it to an
error relationship.
@@ -32,17 +32,32 @@ public class ElasticsearchError extends RuntimeException {
add("NodeClosedException");
}};
- protected boolean isElastic;
+ protected boolean elastic;
- public ElasticsearchError(final Exception ex) {
+ protected boolean notFound;
+
+ public ElasticsearchException(final Exception ex) {
super(ex);
+
final boolean isKnownException =
ELASTIC_ERROR_NAMES.contains(ex.getClass().getSimpleName());
- final boolean isServiceUnavailable =
"ResponseException".equals(ex.getClass().getSimpleName())
- && ex.getMessage().contains("503 Service Unavailable");
- isElastic = isKnownException || isServiceUnavailable;
+
+ final boolean isServiceUnavailable;
+ if ("ResponseException".equals(ex.getClass().getSimpleName())) {
+ isServiceUnavailable = ex.getMessage().contains("503 Service
Unavailable");
+
+ notFound = ex.getMessage().contains("404 Not Found");
+ } else {
+ isServiceUnavailable = false;
+ }
+
+ elastic = isKnownException || isServiceUnavailable;
}
public boolean isElastic() {
- return isElastic;
+ return elastic;
+ }
+
+ public boolean isNotFound() {
+ return notFound;
}
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index ae38810..796ac21 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -76,6 +76,12 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-proxy-configuration-api</artifactId>
+ <version>1.16.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-client-service-api</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>provided</scope>
@@ -143,9 +149,10 @@
to an Elastic-provided Elasticsearch instead of an instance
provided by someone else (e.g. AWS OpenSearch)
see:
https://opensearch.org/blog/community/2021/08/community-clients/ for more info.
- Note: the low-level elasticsearch-rest-client remains licensed
with Apache 2.0 even after the move
+ Note: the low-level elasticsearch-rest-client remains licensed
with Apache 2.0
+
(https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_license.html)
even after the move
of the main Elasticsearch product and
elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
- <version>7.10.2</version>
+ <version>7.13.4</version>
<scope>compile</scope>
<exclusions>
<exclusion>
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 6575050..202c873 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -36,6 +36,8 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StopWatch;
@@ -50,6 +52,7 @@ import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
+import java.net.Proxy;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -79,6 +82,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
props.add(ElasticSearchClientService.USERNAME);
props.add(ElasticSearchClientService.PASSWORD);
props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
+ props.add(ElasticSearchClientService.PROXY_CONFIGURATION_SERVICE);
props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
props.add(ElasticSearchClientService.RETRY_TIMEOUT);
@@ -129,6 +133,8 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
final Integer connectTimeout =
context.getProperty(CONNECT_TIMEOUT).asInteger();
final Integer readTimeout =
context.getProperty(SOCKET_TIMEOUT).asInteger();
+ final ProxyConfigurationService proxyConfigurationService =
context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
+
final HttpHost[] hh = new HttpHost[hostsSplit.length];
for (int x = 0; x < hh.length; x++) {
final URL u = new URL(hostsSplit[x]);
@@ -150,10 +156,22 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
httpClientBuilder.setSSLContext(sslContext);
}
+ CredentialsProvider credentialsProvider = null;
if (username != null && password != null) {
- final CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY,
- new UsernamePasswordCredentials(username,
password));
+ credentialsProvider = addCredentials(null,
AuthScope.ANY, username, password);
+ }
+
+ if (proxyConfigurationService != null) {
+ final ProxyConfiguration proxyConfiguration =
proxyConfigurationService.getConfiguration();
+ if (Proxy.Type.HTTP ==
proxyConfiguration.getProxyType()) {
+ final HttpHost proxy = new
HttpHost(proxyConfiguration.getProxyServerHost(),
proxyConfiguration.getProxyServerPort(), "http");
+ httpClientBuilder.setProxy(proxy);
+
+ credentialsProvider =
addCredentials(credentialsProvider, new AuthScope(proxy),
proxyConfiguration.getProxyUserName(),
proxyConfiguration.getProxyUserPassword());
+ }
+ }
+
+ if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
@@ -168,6 +186,19 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
this.client = builder.build();
}
+ private CredentialsProvider addCredentials(final CredentialsProvider
credentialsProvider, final AuthScope authScope, final String username, final
String password) {
+ final CredentialsProvider cp = credentialsProvider != null ?
credentialsProvider : new BasicCredentialsProvider();
+
+ if (StringUtils.isNotBlank(username) &&
StringUtils.isNotBlank(password)) {
+ cp.setCredentials(
+ authScope == null ? AuthScope.ANY : authScope,
+ new UsernamePasswordCredentials(username, password)
+ );
+ }
+
+ return cp;
+ }
+
private Response runQuery(final String endpoint, final String query, final
String index, final String type, final Map<String, String> requestParameters) {
final StringBuilder sb = new StringBuilder();
if (StringUtils.isNotBlank(index)) {
@@ -183,7 +214,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
try {
return performRequest("POST", sb.toString(), requestParameters,
queryEntity);
} catch (final Exception e) {
- throw new ElasticsearchError(e);
+ throw new ElasticsearchException(e);
}
}
@@ -203,7 +234,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
throw new IOException(errorMessage);
}
} catch (final Exception ex) {
- throw new ElasticsearchError(ex);
+ throw new ElasticsearchException(ex);
}
}
@@ -301,7 +332,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
return IndexOperationResponse.fromJsonResponse(rawResponse);
} catch (final Exception ex) {
- throw new ElasticsearchError(ex);
+ throw new ElasticsearchException(ex);
}
}
@@ -340,7 +371,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
parseResponseWarningHeaders(response);
return new
DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
} catch (final Exception ex) {
- throw new RuntimeException(ex);
+ throw new ElasticsearchException(ex);
}
}
@@ -389,8 +420,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
return (Map<String, Object>) mapper.readValue(body,
Map.class).get("_source");
} catch (final Exception ex) {
- getLogger().error("", ex);
- return null;
+ throw new ElasticsearchException(ex);
}
}
@@ -414,7 +444,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
final Response response = runQuery("_search", query, index, type,
requestParameters);
return buildSearchResponse(response);
} catch (final Exception ex) {
- throw new RuntimeException(ex);
+ throw new ElasticsearchException(ex);
}
}
@@ -425,7 +455,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
final Response response = performRequest("POST",
"/_search/scroll", Collections.emptyMap(), scrollEntity);
return buildSearchResponse(response);
} catch (final Exception ex) {
- throw new RuntimeException(ex);
+ throw new ElasticsearchException(ex);
}
}
@@ -447,7 +477,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
return (String) mapper.readValue(body, Map.class).get("id");
} catch (final Exception ex) {
- throw new RuntimeException(ex);
+ throw new ElasticsearchException(ex);
}
}
@@ -472,11 +502,10 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
if (404 == re.getResponse().getStatusLine().getStatusCode()) {
getLogger().debug("Point in Time {} not found in Elasticsearch
for deletion, ignoring", pitId);
return new DeleteOperationResponse(0);
- } else {
- throw new RuntimeException(re);
}
+ throw new ElasticsearchException(re);
} catch (final Exception ex) {
- throw new RuntimeException(ex);
+ throw new ElasticsearchException(ex);
}
}
@@ -501,11 +530,10 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
if (404 == re.getResponse().getStatusLine().getStatusCode()) {
getLogger().debug("Scroll Id {} not found in Elasticsearch for
deletion, ignoring", scrollId);
return new DeleteOperationResponse(0);
- } else {
- throw new RuntimeException(re);
}
+ throw new ElasticsearchException(re);
} catch (final Exception ex) {
- throw new RuntimeException(ex);
+ throw new ElasticsearchException(ex);
}
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java
index c408f5f..3be0aaa 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java
@@ -84,7 +84,7 @@ public class ElasticSearchStringLookupService extends
AbstractControllerService
}
@Override
- public Optional<String> lookup(Map<String, Object> coordinates) throws
LookupFailureException {
+ public Optional<String> lookup(final Map<String, Object> coordinates)
throws LookupFailureException {
try {
final String id = (String) coordinates.get(ID);
final Map<String, Object> enums = esClient.get(index, type, id,
null);
@@ -93,7 +93,7 @@ public class ElasticSearchStringLookupService extends
AbstractControllerService
} else {
return Optional.ofNullable(mapper.writeValueAsString(enums));
}
- } catch (IOException e) {
+ } catch (final IOException | ElasticsearchException e) {
throw new LookupFailureException(e);
}
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy
index 9cb9885..4c03125 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy
@@ -22,6 +22,7 @@ import org.apache.maven.artifact.versioning.ComparableVersion
import org.apache.nifi.elasticsearch.DeleteOperationResponse
import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
+import org.apache.nifi.elasticsearch.ElasticsearchException
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse
@@ -467,21 +468,24 @@ class ElasticSearchClientService_IT {
void testDeleteById() throws Exception {
final String ID = "1"
final def originalDoc = service.get(INDEX, TYPE, ID, null)
- DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID,
null)
- Assert.assertNotNull(response)
- Assert.assertTrue(response.getTook() > 0)
- def doc = service.get(INDEX, TYPE, ID, null)
- Assert.assertNull(doc)
- doc = service.get(INDEX, TYPE, "2", null)
- Assert.assertNotNull(doc)
-
- // replace the deleted doc
- service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc,
IndexOperationRequest.Operation.Index), null)
- waitForIndexRefresh() // (affects later tests using _search or _bulk)
+ try {
+ DeleteOperationResponse response = service.deleteById(INDEX, TYPE,
ID, null)
+ Assert.assertNotNull(response)
+ Assert.assertTrue(response.getTook() > 0)
+ final ElasticsearchException ee =
Assert.assertThrows(ElasticsearchException.class, { ->
+ service.get(INDEX, TYPE, ID, null) })
+ Assert.assertTrue(ee.isNotFound())
+ final def doc = service.get(INDEX, TYPE, "2", null)
+ Assert.assertNotNull(doc)
+ } finally {
+ // replace the deleted doc
+ service.add(new IndexOperationRequest(INDEX, TYPE, "1",
originalDoc, IndexOperationRequest.Operation.Index), null)
+ waitForIndexRefresh() // (affects later tests using _search or
_bulk)
+ }
}
@Test
- void testGet() throws IOException {
+ void testGet() {
Map old
1.upto(15) { index ->
String id = String.valueOf(index)
@@ -493,6 +497,12 @@ class ElasticSearchClientService_IT {
}
@Test
+ void testGetNotFound() {
+ final ElasticsearchException ee =
Assert.assertThrows(ElasticsearchException.class, { -> service.get(INDEX, TYPE,
"not_found", null) })
+ Assert.assertTrue(ee.isNotFound())
+ }
+
+ @Test
void testSSL() {
final String serviceIdentifier = SSLContextService.class.getName()
final SSLContextService sslContext = mock(SSLContextService.class)
@@ -671,8 +681,10 @@ class ElasticSearchClientService_IT {
deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null,
IndexOperationRequest.Operation.Delete))
Assert.assertFalse(service.bulk(deletes, [refresh:
"true"]).hasErrors())
waitForIndexRefresh() // wait 1s for index refresh (doesn't prevent
GET but affects later tests using _search or _bulk)
- Assert.assertNull(service.get(INDEX, TYPE, TEST_ID, null))
- Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID, null))
+ ElasticsearchException ee =
Assert.assertThrows(ElasticsearchException.class, { -> service.get(INDEX, TYPE,
TEST_ID, null) })
+ Assert.assertTrue(ee.isNotFound())
+ ee = Assert.assertThrows(ElasticsearchException.class, { ->
service.get(INDEX, TYPE, UPSERTED_ID, null) })
+ Assert.assertTrue(ee.isNotFound())
}
@Test
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
index 90bd461..2336a9c 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.elasticsearch.OperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -56,6 +57,7 @@ public abstract class AbstractByQueryElasticsearch extends
AbstractProcessor imp
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
+ rels.add(REL_RETRY);
relationships = Collections.unmodifiableSet(rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
@@ -138,12 +140,21 @@ public abstract class AbstractByQueryElasticsearch
extends AbstractProcessor imp
input = session.putAllAttributes(input, attrs);
session.transfer(input, REL_SUCCESS);
+ } catch (final ElasticsearchException ese) {
+ final String msg = String.format("Encountered a server-side
problem with Elasticsearch. %s",
+ ese.isElastic() ? "Routing to retry." : "Routing to
failure");
+ getLogger().error(msg, ese);
+ if (input != null) {
+ session.penalize(input);
+ input = session.putAttribute(input, getErrorAttribute(),
ese.getMessage());
+ session.transfer(input, ese.isElastic() ? REL_RETRY :
REL_FAILURE);
+ }
} catch (final Exception e) {
+ getLogger().error("Error running \"by query\" operation: ", e);
if (input != null) {
input = session.putAttribute(input, getErrorAttribute(),
e.getMessage());
session.transfer(input, REL_FAILURE);
}
- getLogger().error("Error running \"by query\" operation: ", e);
context.yield();
}
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index 7f1bb31..8af8770 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -22,6 +22,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -168,9 +169,19 @@ public abstract class AbstractJsonQueryElasticsearch<Q
extends JsonQueryParamete
final SearchResponse response = doQuery(queryJsonParameters,
hitsFlowFiles, session, context, input, stopWatch);
finishQuery(input, queryJsonParameters, session, context,
response);
+ } catch (final ElasticsearchException ese) {
+ final String msg = String.format("Encountered a server-side
problem with Elasticsearch. %s",
+ ese.isElastic() ? "Routing to retry." : "Routing to
failure");
+ getLogger().error(msg, ese);
+ if (input != null) {
+ session.penalize(input);
+ input = session.putAttribute(input,
"elasticsearch.query.error", ese.getMessage());
+ session.transfer(input, ese.isElastic() ? REL_RETRY :
REL_FAILURE);
+ }
} catch (Exception ex) {
- getLogger().error("Error processing flowfile.", ex);
+ getLogger().error("Could not query documents.", ex);
if (input != null) {
+ input = session.putAttribute(input,
"elasticsearch.query.error", ex.getMessage());
session.transfer(input, REL_FAILURE);
}
context.yield();
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
index 18d5de8..c6fcbee 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
@@ -34,7 +34,7 @@ import java.util.Map;
@WritesAttribute(attribute = "elasticsearch.delete.error", description
= "The error message provided by Elasticsearch if there is an error running the
delete.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
-@Tags({ "elastic", "elasticsearch", "delete", "query"})
+@Tags({ "elastic", "elasticsearch", "elasticsearch5", "elasticsearch6",
"elasticsearch7", "delete", "query"})
@CapabilityDescription("Delete from an Elasticsearch index using a query. The
query can be loaded from a flowfile body " +
"or from the Query parameter.")
@DynamicProperty(
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
new file mode 100644
index 0000000..e416281
--- /dev/null
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6",
"elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official
Elastic REST client libraries. " +
+ "Note that the full body of the document will be read into memory
before being written to a FlowFile for transfer.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "The filename
attribute is set to the document identifier"),
+ @WritesAttribute(attribute = "elasticsearch.index", description = "The
Elasticsearch index containing the document"),
+ @WritesAttribute(attribute = "elasticsearch.type", description = "The
Elasticsearch document type"),
+ @WritesAttribute(attribute = "elasticsearch.get.error", description =
"The error message provided by Elasticsearch if there is an error fetching the
document.")
+})
+@DynamicProperty(
+ name = "The name of a URL query parameter to add",
+ value = "The value of the URL query parameter",
+ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ description = "Adds the specified property name/value as a query
parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements
ElasticsearchRestProcessor {
+ static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+ "flowfile-content",
+ "FlowFile Content",
+ "Output the retrieved document as the FlowFile content."
+ );
+
+ static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+ "flowfile-attribute",
+ "FlowFile Attribute",
+ "Output the retrieved document as a FlowFile attribute specified
by the Attribute Name."
+ );
+
+ static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+ .name("get-es-id")
+ .displayName("Document Id")
+ .description("The _id of the document to retrieve.")
+ .required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor DESTINATION = new
PropertyDescriptor.Builder()
+ .name("get-es-destination")
+ .displayName("Destination")
+ .description("Indicates whether the retrieved document is written
to the FlowFile content or a FlowFile attribute.")
+ .required(true)
+ .allowableValues(FLOWFILE_CONTENT, FLOWFILE_ATTRIBUTE)
+ .defaultValue(FLOWFILE_CONTENT.getValue())
+ .build();
+
+ static final PropertyDescriptor ATTRIBUTE_NAME = new
PropertyDescriptor.Builder()
+ .name("get-es-attribute-name")
+ .displayName("Attribute Name")
+ .description("The name of the FlowFile attribute to use for the
retrieved document output.")
+ .required(true)
+ .defaultValue("elasticsearch.doc")
+ .dependsOn(DESTINATION, FLOWFILE_ATTRIBUTE)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final Relationship REL_DOC = new
Relationship.Builder().name("document")
+ .description("Fetched documents are routed to this relationship.")
+ .build();
+
+ static final Relationship REL_NOT_FOUND = new
Relationship.Builder().name("not_found")
+ .description("A FlowFile is routed to this relationship if the
specified document does not exist in the Elasticsearch cluster.")
+ .build();
+
+ static final List<PropertyDescriptor> DESCRIPTORS =
Collections.unmodifiableList(Arrays.asList(
+ ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE
+ ));
+ static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND
+ )));
+
+ private volatile ElasticSearchClientService clientService;
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dynamic(true)
+ .build();
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ this.clientService =
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+ FlowFile input = session.get();
+
+ final String id =
context.getProperty(ID).evaluateAttributeExpressions(input).getValue();
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+ final String type =
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+
+ final String destination = context.getProperty(DESTINATION).getValue();
+ final String attributeName =
context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
+
+ try {
+ final StopWatch stopWatch = new StopWatch(true);
+ final Map<String, Object> doc = clientService.get(index, type, id,
getUrlQueryParameters(context, input));
+
+ final Map<String, String> attributes = new HashMap<>(4, 1);
+ attributes.put("filename", id);
+ attributes.put("elasticsearch.index", index);
+ if (type != null) {
+ attributes.put("elasticsearch.type", type);
+ }
+ final String json = mapper.writeValueAsString(doc);
+ FlowFile documentFlowFile = input != null ? input :
session.create();
+ if (FLOWFILE_CONTENT.getValue().equals(destination)) {
+ documentFlowFile = session.write(documentFlowFile, out ->
out.write(json.getBytes()));
+ } else {
+ attributes.put(attributeName, json);
+ }
+
+ documentFlowFile = session.putAllAttributes(documentFlowFile,
attributes);
+ session.getProvenanceReporter().receive(documentFlowFile,
clientService.getTransitUrl(index, type),
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(documentFlowFile, REL_DOC);
+ } catch (final ElasticsearchException ese) {
+ if (ese.isNotFound()) {
+ if (input != null) {
+ session.transfer(input, REL_NOT_FOUND);
+ } else {
+ getLogger().warn("Document with _id {} not found in index
{} (and type {})", id, index, type);
+ }
+ } else {
+ final String msg = String.format("Encountered a server-side
problem with Elasticsearch. %s",
+ ese.isElastic() ? "Routing to retry." : "Routing to
failure");
+ getLogger().error(msg, ese);
+ if (input != null) {
+ session.penalize(input);
+ input = session.putAttribute(input,
"elasticsearch.get.error", ese.getMessage());
+ session.transfer(input, ese.isElastic() ? REL_RETRY :
REL_FAILURE);
+ }
+ }
+ } catch (final Exception ex) {
+ getLogger().error("Could not fetch document.", ex);
+ if (input != null) {
+ input = session.putAttribute(input, "elasticsearch.get.error",
ex.getMessage());
+ session.transfer(input, REL_FAILURE);
+ }
+ context.yield();
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
index a460c3a..a0bce42 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
@@ -39,11 +39,12 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = "mime.type", description =
"application/json"),
@WritesAttribute(attribute = "aggregation.name", description = "The name
of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "aggregation.number", description = "The
number of the aggregation whose results are in the output flowfile"),
- @WritesAttribute(attribute = "hit.count", description = "The number of
hits that are in the output flowfile")
+ @WritesAttribute(attribute = "hit.count", description = "The number of
hits that are in the output flowfile"),
+ @WritesAttribute(attribute = "elasticsearch.query.error", description =
"The error message provided by Elasticsearch if there is an error querying the
index.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
-@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
+@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7",
"query", "read", "get", "json"})
@CapabilityDescription("A processor that allows the user to run a query (with
aggregations) written with the " +
"Elasticsearch JSON DSL. It does not automatically paginate queries
for the user. If an incoming relationship is added to this " +
"processor, it will use the flowfile's content for the query. Care
should be taken on the size of the query because the entire response " +
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
index 701d43c..f2384b6 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
@@ -42,11 +42,12 @@ import java.util.List;
@WritesAttribute(attribute = "aggregation.name", description = "The name
of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "aggregation.number", description = "The
number of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "page.number", description = "The number of
the page (request) in which the results were returned that are in the output
flowfile"),
- @WritesAttribute(attribute = "hit.count", description = "The number of
hits that are in the output flowfile")
+ @WritesAttribute(attribute = "hit.count", description = "The number of
hits that are in the output flowfile"),
+ @WritesAttribute(attribute = "elasticsearch.query.error", description =
"The error message provided by Elasticsearch if there is an error querying the
index.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
-@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "read",
"json"})
+@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7",
"query", "scroll", "page", "read", "json"})
@CapabilityDescription("A processor that allows the user to run a paginated
query (with aggregations) written with the Elasticsearch JSON DSL. " +
"It will use the flowfile's content for the query unless the QUERY
attribute is populated. " +
"Search After/Point in Time queries must include a valid \"sort\"
field.")
@@ -58,7 +59,7 @@ import java.util.List;
"These parameters will override any matching parameters in the
query request body")
@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"Care should be taken on the size of each page because each response " +
"from Elasticsearch will be loaded into memory all at once and
converted into the resulting flowfiles.")
-public class PaginatedJsonQueryElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
+public class PaginatedJsonQueryElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch {
private static final List<PropertyDescriptor> propertyDescriptors;
static {
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index ec3c3f2..6fd1ddb 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -21,6 +21,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -30,7 +32,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.elasticsearch.ElasticsearchError;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -72,14 +74,17 @@ import java.util.Optional;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put",
"index", "record"})
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6",
"elasticsearch7", "put", "index", "record"})
@CapabilityDescription("A record-aware Elasticsearch put processor that uses
the official Elastic REST client libraries.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "elasticsearch.put.error", description =
"The error message provided by Elasticsearch if there is an error indexing the
documents.")
+})
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Adds the specified property name/value as a query
parameter in the Elasticsearch URL used for processing. " +
- "These parameters will override any matching parameters in the
query request body")
+ "These parameters will override any matching parameters in the
_bulk request body")
public class PutElasticsearchRecord extends AbstractProcessor implements
ElasticsearchRestProcessor {
static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
.name("put-es-record-reader")
@@ -346,7 +351,7 @@ public class PutElasticsearchRecord extends
AbstractProcessor implements Elastic
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
- final FlowFile input = session.get();
+ FlowFile input = session.get();
if (input == null) {
return;
}
@@ -415,18 +420,21 @@ public class PutElasticsearchRecord extends
AbstractProcessor implements Elastic
badRecords.add(bad);
}
}
- } catch (final ElasticsearchError ese) {
+ } catch (final ElasticsearchException ese) {
final String msg = String.format("Encountered a server-side
problem with Elasticsearch. %s",
- ese.isElastic() ? "Moving to retry." : "Moving to
failure");
+ ese.isElastic() ? "Routing to retry." : "Routing to
failure");
getLogger().error(msg, ese);
final Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
session.penalize(input);
+ input = session.putAttribute(input, "elasticsearch.put.error",
ese.getMessage());
session.transfer(input, rel);
removeBadRecordFlowFiles(badRecords, session);
return;
} catch (final Exception ex) {
getLogger().error("Could not index documents.", ex);
+ input = session.putAttribute(input, "elasticsearch.put.error",
ex.getMessage());
session.transfer(input, REL_FAILURE);
+ context.yield();
removeBadRecordFlowFiles(badRecords, session);
return;
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
index 073f779..93adcfe 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
@@ -55,13 +55,14 @@ import java.util.Set;
@WritesAttribute(attribute = "aggregation.name", description = "The name
of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "aggregation.number", description = "The
number of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "page.number", description = "The number of
the page (request) in which the results were returned that are in the output
flowfile"),
- @WritesAttribute(attribute = "hit.count", description = "The number of
hits that are in the output flowfile")
+ @WritesAttribute(attribute = "hit.count", description = "The number of
hits that are in the output flowfile"),
+ @WritesAttribute(attribute = "elasticsearch.query.error", description =
"The error message provided by Elasticsearch if there is an error querying the
index.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@PrimaryNodeOnly
@DefaultSchedule(period="1 min")
-@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page",
"search", "json"})
+@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7",
"query", "scroll", "page", "search", "json"})
@CapabilityDescription("A processor that allows the user to repeatedly run a
paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
"Search After/Point in Time queries must include a valid \"sort\"
field. The processor will retrieve multiple pages of results " +
"until either no more results are available or the Pagination Keep
Alive expiration is reached, after which the query will " +
@@ -77,7 +78,7 @@ import java.util.Set;
"(when the current time is later than the last query execution plus
the Pagination Keep Alive interval).")
@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"Care should be taken on the size of each page because each response " +
"from Elasticsearch will be loaded into memory all at once and
converted into the resulting flowfiles.")
-public class SearchElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
+public class SearchElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch {
static final String STATE_SCROLL_ID = "scrollId";
static final String STATE_PIT_ID = "pitId";
static final String STATE_SEARCH_AFTER = "searchAfter";
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java
index ba98025..a64baba 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java
@@ -34,7 +34,7 @@ import java.util.Map;
@WritesAttribute(attribute = "elasticsearch.update.error", description
= "The error message provided by Elasticsearch if there is an error running the
update.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
-@Tags({ "elastic", "elasticsearch", "update", "query"})
+@Tags({ "elastic", "elasticsearch", "elasticsearch5", "elasticsearch6",
"elasticsearch7", "update", "query"})
@CapabilityDescription("Update documents in an Elasticsearch index using a
query. The query can be loaded from a flowfile body " +
"or from the Query parameter.")
@DynamicProperty(
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 749ba53..69bbf40 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -19,3 +19,4 @@
org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch
org.apache.nifi.processors.elasticsearch.SearchElasticsearch
org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord
org.apache.nifi.processors.elasticsearch.UpdateByQueryElasticsearch
+org.apache.nifi.processors.elasticsearch.GetElasticsearch
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
index 51fdec1..3ae81a2 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
@@ -279,7 +279,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryEla
runOnce(runner)
- final TestElasticsearchClientService service =
runner.getControllerService("esService") as TestElasticsearchClientService
+ final TestElasticsearchClientService service = getService(runner)
if (getProcessor() instanceof SearchElasticsearch || getProcessor()
instanceof PaginatedJsonQueryElasticsearch) {
Assert.assertEquals(3, service.getRequestParameters().size())
Assert.assertEquals("600s",
service.getRequestParameters().get("scroll"))
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
index afffcb2..274e6a2 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
@@ -178,7 +178,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest
extends AbstractJsonQ
// check error was caught and logged
assertThat(runner.getLogger().getErrorMessages().stream()
.anyMatch({ logMessage ->
- logMessage.getMsg().contains("Error processing flowfile")
&&
+ logMessage.getMsg().contains("Could not query documents")
&&
logMessage.getThrowable().getMessage() ==
"Simulated IOException - initialisePointInTime"
}),
is(true)
@@ -199,7 +199,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest
extends AbstractJsonQ
// check error was caught and logged
assertThat(runner.getLogger().getErrorMessages().stream()
.anyMatch({ logMessage ->
- logMessage.getMsg().contains("Error processing flowfile")
&&
+ logMessage.getMsg().contains("Could not query documents")
&&
logMessage.getThrowable().getMessage() == "Query
using pit/search_after must contain a \"sort\" field"
}),
is(true)
@@ -213,7 +213,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest
extends AbstractJsonQ
testCounts(runner, 0, 0, isInput() ? 1 : 0, 0)
assertThat(runner.getLogger().getErrorMessages().stream()
.anyMatch({ logMessage ->
- logMessage.getMsg().contains("Error processing flowfile")
&&
+ logMessage.getMsg().contains("Could not query documents")
&&
logMessage.getThrowable().getMessage() == "Query
using pit/search_after must contain a \"sort\" field"
}),
is(true)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
new file mode 100644
index 0000000..5ffe818
--- /dev/null
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.ProvenanceEventType
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.hamcrest.MatcherAssert
+import org.junit.Assert
+import org.junit.Test
+
+import static groovy.json.JsonOutput.toJson
+import static org.hamcrest.CoreMatchers.equalTo
+import static org.hamcrest.CoreMatchers.is
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.junit.Assert.assertThrows
+
+class GetElasticsearchTest {
+ static final String INDEX_NAME = "messages"
+
+ @Test
+ void testMandatoryProperties() {
+ final TestRunner runner = createRunner()
+ runner.removeProperty(GetElasticsearch.CLIENT_SERVICE)
+ runner.removeProperty(GetElasticsearch.INDEX)
+ runner.removeProperty(GetElasticsearch.TYPE)
+ runner.removeProperty(GetElasticsearch.ID)
+ runner.removeProperty(GetElasticsearch.DESTINATION)
+ runner.removeProperty(GetElasticsearch.ATTRIBUTE_NAME)
+
+ final AssertionError assertionError =
assertThrows(AssertionError.class, runner.&run)
+ assertThat(assertionError.getMessage(),
equalTo(String.format("Processor has 3 validation failures:\n" +
+ "'%s' is invalid because %s is required\n" +
+ "'%s' is invalid because %s is required\n" +
+ "'%s' is invalid because %s is required\n",
+ GetElasticsearch.ID.getDisplayName(),
GetElasticsearch.ID.getDisplayName(),
+ GetElasticsearch.INDEX.getDisplayName(),
GetElasticsearch.INDEX.getDisplayName(),
+ GetElasticsearch.CLIENT_SERVICE.getDisplayName(),
GetElasticsearch.CLIENT_SERVICE.getDisplayName()
+ )))
+ }
+
+ @Test
+ void testInvalidProperties() {
+ final TestRunner runner = createRunner()
+ runner.setProperty(GetElasticsearch.CLIENT_SERVICE, "not-a-service")
+ runner.setProperty(GetElasticsearch.INDEX, "")
+ runner.setProperty(GetElasticsearch.TYPE, "")
+ runner.setProperty(GetElasticsearch.ID, "")
+ runner.setProperty(GetElasticsearch.DESTINATION, "not-valid")
+ runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME, "")
+
+ final AssertionError assertionError =
assertThrows(AssertionError.class, runner.&run)
+ assertThat(assertionError.getMessage(),
equalTo(String.format("Processor has 6 validation failures:\n" +
+ "'%s' validated against '' is invalid because %s cannot be
empty\n" +
+ "'%s' validated against '' is invalid because %s cannot be
empty\n" +
+ "'%s' validated against '' is invalid because %s cannot be
empty\n" +
+ "'%s' validated against 'not-valid' is invalid because Given
value not found in allowed set '%s'\n" +
+ "'%s' validated against 'not-a-service' is invalid because
Property references a Controller Service that does not exist\n" +
+ "'%s' validated against 'not-a-service' is invalid because
Invalid Controller Service: not-a-service is not a valid Controller Service
Identifier\n",
+ GetElasticsearch.ID.getName(), GetElasticsearch.ID.getName(),
+ GetElasticsearch.INDEX.getName(),
GetElasticsearch.INDEX.getName(),
+ GetElasticsearch.TYPE.getName(),
GetElasticsearch.TYPE.getName(),
+ GetElasticsearch.DESTINATION.getName(),
[GetElasticsearch.FLOWFILE_CONTENT.getValue(),
GetElasticsearch.FLOWFILE_ATTRIBUTE.getValue()].join(", "),
+ GetElasticsearch.CLIENT_SERVICE.getDisplayName(),
+ GetElasticsearch.CLIENT_SERVICE.getDisplayName()
+ )))
+ }
+
+ @Test
+ void testInvalidAttributeName() {
+ final TestRunner runner = createRunner()
+ runner.setProperty(GetElasticsearch.DESTINATION,
GetElasticsearch.FLOWFILE_ATTRIBUTE)
+ runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME, "")
+
+ final AssertionError assertionError =
assertThrows(AssertionError.class, runner.&run)
+ assertThat(assertionError.getMessage(),
equalTo(String.format("Processor has 1 validation failures:\n" +
+ "'%s' validated against '' is invalid because %s cannot be
empty\n",
+ GetElasticsearch.ATTRIBUTE_NAME.getName(),
GetElasticsearch.ATTRIBUTE_NAME.getName()
+ )))
+ }
+
+ @Test
+ void testFetch() throws Exception {
+ final TestRunner runner = createRunner()
+
+ runProcessor(runner)
+
+ testCounts(runner, 1, 0, 0, 0)
+ final FlowFile doc =
runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
+ assertOutputContent(doc.getContent())
+ MatcherAssert.assertThat(
+ runner.getProvenanceEvents().stream().filter({ pe ->
+ pe.getEventType() == ProvenanceEventType.RECEIVE &&
+ pe.getAttribute("uuid") == doc.getAttribute("uuid")
+ }).count(),
+ is(1L)
+ )
+ }
+
+ @Test
+ void testInputHandlingDestinationContent() {
+ final TestRunner runner = createRunner()
+ runner.setProperty(GetElasticsearch.DESTINATION,
GetElasticsearch.FLOWFILE_CONTENT)
+
+ runner.setIncomingConnection(true)
+ runProcessor(runner)
+ testCounts(runner, 1, 0, 0, 0)
+ MockFlowFile doc =
runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
+ assertOutputContent(doc.getContent())
+ assertCommonAttributes(doc)
+ assertOutputAttribute(doc)
+ reset(runner)
+
+ runner.setIncomingConnection(false)
+ runner.run()
+ testCounts(runner, 1, 0, 0, 0)
+ doc =
runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
+ assertOutputContent(doc.getContent())
+ assertCommonAttributes(doc)
+ assertOutputAttribute(doc)
+ }
+
+ @Test
+ void testDestinationAttribute() {
+ final TestRunner runner = createRunner()
+ runner.setProperty(GetElasticsearch.DESTINATION,
GetElasticsearch.FLOWFILE_ATTRIBUTE)
+
+ runner.setIncomingConnection(true)
+ runProcessor(runner)
+ testCounts(runner, 1, 0, 0, 0)
+ MockFlowFile doc =
runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
+ assertThat(doc.getContent(), is("test"))
+ assertCommonAttributes(doc)
+ assertOutputAttribute(doc, true)
+ reset(runner)
+
+ // non-default attribute name and fetch without type
+ runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME, "my_attr")
+ runner.removeProperty(GetElasticsearch.TYPE)
+ runner.setIncomingConnection(false)
+ runner.run()
+ testCounts(runner, 1, 0, 0, 0)
+ doc =
runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
+ assertThat(doc.getContent(), is(""))
+ assertCommonAttributes(doc, false)
+ assertOutputAttribute(doc, true, "my_attr")
+ }
+
+ @Test
+ void testErrorDuringFetch() throws Exception {
+ final TestRunner runner = createRunner()
+ getService(runner).setThrowErrorInGet(true)
+
+ runner.setIncomingConnection(true)
+ runProcessor(runner)
+ testCounts(runner, 0, 1, 0, 0)
+ reset(runner)
+
+ runner.setIncomingConnection(false)
+ runner.run()
+ testCounts(runner, 0, 0, 0, 0)
+ }
+
+ @Test
+ void testNotFound() throws Exception {
+ final TestRunner runner = createRunner()
+ getService(runner).setThrowNotFoundInGet(true)
+
+ runProcessor(runner)
+ testCounts(runner, 0, 0, 0, 1)
+ reset(runner)
+ }
+
+ @Test
+ void testRequestParameters() {
+ final TestRunner runner = createRunner()
+ runner.setProperty("refresh", "true")
+ runner.setProperty("_source", '${source}')
+ runner.setVariable("source", "msg")
+
+ runProcessor(runner)
+
+ final TestElasticsearchClientService service = getService(runner)
+ Assert.assertEquals(2, service.getRequestParameters().size())
+ Assert.assertEquals("true",
service.getRequestParameters().get("refresh"))
+ Assert.assertEquals("msg",
service.getRequestParameters().get("_source"))
+ }
+
+ private static void testCounts(final TestRunner runner, final int doc,
final int failure, final int retry, final int notFound) {
+ runner.assertTransferCount(GetElasticsearch.REL_DOC, doc)
+ runner.assertTransferCount(GetElasticsearch.REL_FAILURE, failure)
+ runner.assertTransferCount(GetElasticsearch.REL_RETRY, retry)
+ runner.assertTransferCount(GetElasticsearch.REL_NOT_FOUND, notFound)
+ }
+
+ private static void assertOutputContent(final String content) {
+ assertThat(content, is(toJson(["msg": "one"])))
+ }
+
+ private static void assertOutputAttribute(final MockFlowFile doc, final
boolean attributeOutput = false, final String attr = "elasticsearch.doc") {
+ if (attributeOutput) {
+ doc.assertAttributeEquals(attr, toJson(["msg": "one"]))
+ } else {
+ doc.assertAttributeNotExists(attr)
+ }
+ }
+
+ private static void assertCommonAttributes(final MockFlowFile doc, final
boolean type = true, final boolean error = false) {
+ doc.assertAttributeEquals("filename", "doc_1")
+ doc.assertAttributeEquals("elasticsearch.index", INDEX_NAME)
+ if (type) {
+ doc.assertAttributeEquals("elasticsearch.type", "message")
+ } else {
+ doc.assertAttributeNotExists("elasticsearch.type")
+ }
+
+ if (error) {
+ doc.assertAttributeEquals("elasticsearch.get.error", "message")
+ } else {
+ doc.assertAttributeNotExists("elasticsearch.get.error")
+ }
+ }
+
+ private static TestRunner createRunner() {
+ final GetElasticsearch processor = new GetElasticsearch()
+ final TestRunner runner = TestRunners.newTestRunner(processor)
+ final TestElasticsearchClientService service = new
TestElasticsearchClientService(false)
+ runner.addControllerService("esService", service)
+ runner.enableControllerService(service)
+ runner.setProperty(GetElasticsearch.CLIENT_SERVICE, "esService")
+ runner.setProperty(GetElasticsearch.INDEX, INDEX_NAME)
+ runner.setProperty(GetElasticsearch.TYPE, "message")
+ runner.setProperty(GetElasticsearch.ID, "doc_1")
+ runner.setProperty(GetElasticsearch.DESTINATION,
GetElasticsearch.FLOWFILE_CONTENT)
+ runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME,
"elasticsearch.doc")
+ runner.setValidateExpressionUsage(true)
+
+ return runner
+ }
+
+ private static MockFlowFile runProcessor(final TestRunner runner) {
+ final MockFlowFile ff = runner.enqueue("test")
+ runner.run()
+ return ff
+ }
+
+ private static TestElasticsearchClientService getService(final TestRunner
runner) {
+ return runner.getControllerService("esService",
TestElasticsearchClientService.class)
+ }
+
+ private static void reset(final TestRunner runner) {
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
index c606d36..5fc0647 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
@@ -64,7 +64,7 @@ class SearchElasticsearchTest extends
AbstractPaginatedJsonQueryElasticsearchTes
testCounts(runner, 0, 0, 0, 0)
assertThat(runner.getLogger().getErrorMessages().stream()
.anyMatch({ logMessage ->
- logMessage.getMsg().contains("Error processing flowfile")
&&
+ logMessage.getMsg().contains("Could not query documents")
&&
logMessage.getThrowable().getMessage() ==
"Simulated IOException - scroll"
}),
is(true)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
index 9bd4cb6..7410380 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
@@ -21,14 +21,20 @@ import groovy.json.JsonSlurper
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.elasticsearch.DeleteOperationResponse
import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.ElasticsearchException
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse
import org.apache.nifi.elasticsearch.UpdateOperationResponse
+import org.apache.nifi.processors.elasticsearch.mock.MockElasticsearchException
+import org.elasticsearch.client.Response
+import org.elasticsearch.client.ResponseException
class TestElasticsearchClientService extends AbstractControllerService
implements ElasticSearchClientService {
private boolean returnAggs
private boolean throwErrorInSearch
+ private boolean throwErrorInGet
+ private boolean throwNotFoundInGet
private boolean throwErrorInDelete
private boolean throwErrorInPit
private boolean throwErrorInUpdate
@@ -43,7 +49,11 @@ class TestElasticsearchClientService extends
AbstractControllerService implement
private void common(boolean throwError, Map<String, String>
requestParameters) {
if (throwError) {
- throw new IOException("Simulated IOException")
+ if (throwNotFoundInGet) {
+ throw new MockElasticsearchException(false, true)
+ } else {
+ throw new IOException("Simulated IOException")
+ }
}
this.requestParameters = requestParameters
}
@@ -89,7 +99,7 @@ class TestElasticsearchClientService extends
AbstractControllerService implement
@Override
Map<String, Object> get(String index, String type, String id, Map<String,
String> requestParameters) {
- common(false, requestParameters)
+ common(throwErrorInGet || throwNotFoundInGet, requestParameters)
return [ "msg": "one" ]
}
@@ -298,6 +308,14 @@ class TestElasticsearchClientService extends
AbstractControllerService implement
" }\n" +
" ]"
+ void setThrowNotFoundInGet(boolean throwNotFoundInGet) {
+ this.throwNotFoundInGet = throwNotFoundInGet
+ }
+
+ void setThrowErrorInGet(boolean throwErrorInGet) {
+ this.throwErrorInGet = throwErrorInGet
+ }
+
void setThrowErrorInSearch(boolean throwErrorInSearch) {
this.throwErrorInSearch = throwErrorInSearch
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
index 21a3a8f..7d63ec5 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
@@ -29,9 +29,9 @@ class MockBulkLoadClientService extends
AbstractMockElasticsearchClient {
@Override
IndexOperationResponse bulk(List<IndexOperationRequest> items, Map<String,
String> requestParameters) {
if (throwRetriableError) {
- throw new MockElasticsearchError(true)
+ throw new MockElasticsearchException(true, false)
} else if (throwFatalError) {
- throw new MockElasticsearchError(false)
+ throw new MockElasticsearchException(false, false)
}
if (evalClosure) {
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchException.groovy
similarity index 75%
rename from
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
rename to
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchException.groovy
index 7b1073b..90f8bce 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchException.groovy
@@ -17,15 +17,16 @@
package org.apache.nifi.processors.elasticsearch.mock
-import org.apache.nifi.elasticsearch.ElasticsearchError
+import org.apache.nifi.elasticsearch.ElasticsearchException
-class MockElasticsearchError extends ElasticsearchError {
- MockElasticsearchError(boolean isElastic) {
+class MockElasticsearchException extends ElasticsearchException {
+ MockElasticsearchException(boolean elastic, boolean notFound) {
this(new Exception())
- this.isElastic = isElastic
+ this.elastic = elastic
+ this.notFound = notFound
}
- MockElasticsearchError(Exception ex) {
+ MockElasticsearchException(Exception ex) {
super(ex)
}
}