This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6542505a50 NIFI-10797 add customisable Elasticsearch REST Client 
config and Elasticsearch Cluster Sniffer
6542505a50 is described below

commit 6542505a5071f7bd1152daea5b1763b2af618a74
Author: Chris Sampson <[email protected]>
AuthorDate: Fri Nov 11 15:12:05 2022 +0000

    NIFI-10797 add customisable Elasticsearch REST Client config and 
Elasticsearch Cluster Sniffer
    
    Signed-off-by: Joe Gresock <[email protected]>
    
    This closes #6658.
---
 .../elasticsearch/ElasticSearchClientService.java  | 116 ++++++-
 .../nifi-elasticsearch-client-service/pom.xml      |   4 +
 .../ElasticSearchClientServiceImpl.java            | 334 ++++++++++++++++-----
 .../additionalDetails.html                         |  54 ++++
 .../nifi/elasticsearch/SearchResponseTest.java     |  24 +-
 .../TestElasticSearchClientService.java            |   4 +-
 .../nifi/elasticsearch/TestSchemaRegistry.java     |   6 +-
 .../integration/AbstractElasticsearch_IT.java      |  11 +
 .../integration/ElasticSearchClientService_IT.java | 160 +++++++++-
 .../integration/AbstractElasticsearchITBase.java   |  57 ++--
 nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml |  36 ++-
 11 files changed, 656 insertions(+), 150 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 4664ae1a64..7e80f345d9 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.elasticsearch;
 
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
@@ -32,8 +30,6 @@ import org.apache.nifi.ssl.SSLContextService;
 import java.util.List;
 import java.util.Map;
 
-@Tags({"elasticsearch", "client"})
-@CapabilityDescription("A controller service for accessing an Elasticsearch 
client.")
 public interface ElasticSearchClientService extends ControllerService, 
VerifiableControllerService {
     PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
             .name("el-cs-http-hosts")
@@ -148,6 +144,118 @@ public interface ElasticSearchClientService extends 
ControllerService, Verifiabl
             .required(true)
             .build();
 
+    PropertyDescriptor COMPRESSION = new PropertyDescriptor.Builder()
+            .name("el-cs-enable-compression")
+            .displayName("Enable Compression")
+            .description("Whether the REST client should compress requests 
using gzip content encoding and add the " +
+                    "\"Accept-Encoding: gzip\" header to receive compressed 
responses")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    PropertyDescriptor SEND_META_HEADER = new PropertyDescriptor.Builder()
+            .name("el-cs-send-meta-header")
+            .displayName("Send Meta Header")
+            .description("Whether to send a \"X-Elastic-Client-Meta\" header 
that describes the runtime environment. " +
+                    "It contains information that is similar to what could be 
found in User-Agent. " +
+                    "Using a separate header allows applications to use 
User-Agent for their own needs, " +
+                    "e.g. to identify application version or other environment 
information")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+
+    PropertyDescriptor STRICT_DEPRECATION = new PropertyDescriptor.Builder()
+            .name("el-cs-strict-deprecation")
+            .displayName("Strict Deprecation")
+            .description("Whether the REST client should return any response 
containing at least one warning header as a failure")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    AllowableValue NODE_SELECTOR_ANY = new AllowableValue("ANY", "Any",
+            "Select any Elasticsearch node to handle requests");
+    AllowableValue NODE_SELECTOR_SKIP_DEDICATED_MASTERS = new 
AllowableValue("SKIP_DEDICATED_MASTERS", "Skip Dedicated Masters",
+            "Skip dedicated Elasticsearch master nodes for handling request");
+
+    PropertyDescriptor NODE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("el-cs-node-selector")
+            .displayName("Node Selector")
+            .description("Selects Elasticsearch nodes that can receive 
requests. Used to keep requests away from dedicated Elasticsearch master nodes")
+            .allowableValues(NODE_SELECTOR_ANY, 
NODE_SELECTOR_SKIP_DEDICATED_MASTERS)
+            .defaultValue(NODE_SELECTOR_ANY.getValue())
+            .required(true)
+            .build();
+
+    PropertyDescriptor PATH_PREFIX = new PropertyDescriptor.Builder()
+            .name("el-cs-path-prefix")
+            .displayName("Path Prefix")
+            .description("Sets the path's prefix for every request used by the 
http client. " +
+                    "For example, if this is set to \"/my/path\", then any 
client request will become \"/my/path/\" + endpoint. " +
+                    "In essence, every request's endpoint is prefixed by this 
pathPrefix. " +
+                    "The path prefix is useful for when Elasticsearch is 
behind a proxy that provides a base path or a proxy that requires all paths to 
start with '/'; " +
+                    "it is not intended for other purposes and it should not 
be supplied in other scenarios")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    PropertyDescriptor SNIFF_CLUSTER_NODES = new PropertyDescriptor.Builder()
+            .name("el-cs-sniff-cluster-nodes")
+            .displayName("Sniff Cluster Nodes")
+            .description("Periodically sniff for nodes within the 
Elasticsearch cluster via the Elasticsearch Node Info API. " +
+                    "If Elasticsearch security features are enabled (default 
to \"true\" for 8.x+), the Elasticsearch user must " +
+                    "have the \"monitor\" or \"manage\" cluster privilege to 
use this API." +
+                    "Note that all " + HTTP_HOSTS.getDisplayName() + " (and 
those that may be discovered within the cluster " +
+                    "using the Sniffer) must use the same protocol, e.g. http 
or https, and be contactable using the same client settings. " +
+                    "Finally the Elasticsearch \"network.publish_host\" must 
match one of the \"network.bind_host\" list entries " +
+                    "see 
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html
 for more information")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    PropertyDescriptor SNIFF_ON_FAILURE = new PropertyDescriptor.Builder()
+            .name("el-cs-sniff-failure")
+            .displayName("Sniff on Failure")
+            .description("Enable sniffing on failure, meaning that after each 
failure the Elasticsearch nodes list gets updated " +
+                    "straightaway rather than at the following ordinary 
sniffing round")
+            .dependsOn(SNIFF_CLUSTER_NODES, "true")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    PropertyDescriptor SNIFFER_INTERVAL = new PropertyDescriptor.Builder()
+            .name("el-cs-sniffer-interval")
+            .displayName("Sniffer Interval")
+            .description("Interval between Cluster sniffer operations")
+            .dependsOn(SNIFF_CLUSTER_NODES, "true")
+            .defaultValue("5 mins")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(true)
+            .build();
+
+    PropertyDescriptor SNIFFER_REQUEST_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("el-cs-sniffer-request-timeout")
+            .displayName("Sniffer Request Timeout")
+            .description("Cluster sniffer timeout for node info requests")
+            .dependsOn(SNIFF_CLUSTER_NODES, "true")
+            .defaultValue("1 sec")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(true)
+            .build();
+
+    PropertyDescriptor SNIFFER_FAILURE_DELAY = new PropertyDescriptor.Builder()
+            .name("el-cs-sniffer-failure-delay")
+            .displayName("Sniffer Failure Delay")
+            .description("Delay between an Elasticsearch request failure and 
updating available Cluster nodes using the Sniffer")
+            .dependsOn(SNIFF_ON_FAILURE, "true")
+            .defaultValue("1 min")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(true)
+            .build();
+
     /**
      * Index a document.
      *
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index 9da490b8af..3cc3f98bef 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -132,6 +132,10 @@
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-client-sniffer</artifactId>
+        </dependency>
 
         <!-- test dependencies -->
         <dependency>
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 9658c2ce86..3fceedc8ab 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -32,6 +32,9 @@ import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.message.BasicHeader;
 import org.apache.http.nio.entity.NStringEntity;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.ConfigVerificationResult;
@@ -40,19 +43,26 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.proxy.ProxyConfigurationService;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.StringUtils;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.NodeSelector;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
+import org.elasticsearch.client.sniff.SniffOnFailureListener;
+import org.elasticsearch.client.sniff.Sniffer;
 
 import javax.net.ssl.SSLContext;
 import java.io.ByteArrayOutputStream;
@@ -73,12 +83,23 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+@Tags({"elasticsearch", "elasticsearch6", "elasticsearch7", "elasticsearch8", 
"client"})
+@CapabilityDescription("A controller service for accessing an Elasticsearch 
client. " +
+        "Uses the Elasticsearch REST Client (7.13.4, the last version before 
client connections verify" +
+        "the server is Elastic provided, this should allow for connections to 
compatible alternatives, e.g. AWS OpenSearch)")
+@DynamicProperty(
+        name = "The name of a Request Header to add",
+        value = "The value of the Header",
+        expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
+        description = "Adds the specified property name/value as a Request 
Header in the Elasticsearch requests.")
 public class ElasticSearchClientServiceImpl extends AbstractControllerService 
implements ElasticSearchClientService {
     public static final String VERIFICATION_STEP_CONNECTION = "Elasticsearch 
Connection";
     public static final String VERIFICATION_STEP_CLIENT_SETUP = "Elasticsearch 
Rest Client Setup";
     public static final String VERIFICATION_STEP_WARNINGS = "Elasticsearch 
Warnings";
+    public static final String VERIFICATION_STEP_SNIFFER = "Elasticsearch 
Sniffer";
 
     private ObjectMapper mapper;
 
@@ -86,6 +107,8 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
 
     private RestClient client;
 
+    private Sniffer sniffer;
+
     private String url;
     private Charset responseCharset;
     private ObjectWriter prettyPrintWriter;
@@ -93,6 +116,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
     static {
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(HTTP_HOSTS);
+        props.add(PATH_PREFIX);
         props.add(AUTHORIZATION_SCHEME);
         props.add(USERNAME);
         props.add(PASSWORD);
@@ -104,6 +128,15 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         props.add(SOCKET_TIMEOUT);
         props.add(CHARSET);
         props.add(SUPPRESS_NULLS);
+        props.add(COMPRESSION);
+        props.add(SEND_META_HEADER);
+        props.add(STRICT_DEPRECATION);
+        props.add(NODE_SELECTOR);
+        props.add(SNIFF_CLUSTER_NODES);
+        props.add(SNIFFER_INTERVAL);
+        props.add(SNIFFER_REQUEST_TIMEOUT);
+        props.add(SNIFF_ON_FAILURE);
+        props.add(SNIFFER_FAILURE_DELAY);
 
         properties = Collections.unmodifiableList(props);
     }
@@ -113,9 +146,20 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         return properties;
     }
 
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                .dynamic(true)
+                .build();
+    }
+
     @Override
     protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+        final List<ValidationResult> results = new ArrayList<>(1);
 
         final AuthorizationScheme authorizationScheme = 
AuthorizationScheme.valueOf(validationContext.getProperty(AUTHORIZATION_SCHEME).getValue());
 
@@ -126,6 +170,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         final boolean apiKeySet = 
validationContext.getProperty(API_KEY).isSet();
 
         final SSLContextService sslService = 
validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
         if (authorizationScheme == AuthorizationScheme.PKI && (sslService == 
null || !sslService.isKeyStoreConfigured())) {
             results.add(new 
ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName()).valid(false)
                     .explanation(String.format("if '%s' is '%s' then '%s' must 
be set and specify a Keystore for mutual TLS encryption.",
@@ -146,6 +191,13 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             addAuthorizationPropertiesValidationIssue(results, API_KEY, 
API_KEY_ID);
         }
 
+        final boolean sniffClusterNodes = 
validationContext.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
+        final boolean sniffOnFailure = 
validationContext.getProperty(SNIFF_ON_FAILURE).asBoolean();
+        if (sniffOnFailure && !sniffClusterNodes) {
+            results.add(new 
ValidationResult.Builder().subject(SNIFF_ON_FAILURE.getName()).valid(false)
+                    .explanation(String.format("'%s' cannot be enabled if '%s' 
is disabled", SNIFF_ON_FAILURE.getDisplayName(), 
SNIFF_CLUSTER_NODES.getDisplayName())).build());
+        }
+
         return results;
     }
 
@@ -160,6 +212,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
     public void onEnabled(final ConfigurationContext context) throws 
InitializationException {
         try {
             this.client = setupClient(context);
+            this.sniffer = setupSniffer(context, this.client);
             responseCharset = 
Charset.forName(context.getProperty(CHARSET).getValue());
 
             // re-create the ObjectMapper in case the SUPPRESS_NULLS property 
has changed - the JsonInclude settings aren't dynamic
@@ -178,6 +231,11 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
 
     @OnDisabled
     public void onDisabled() throws IOException {
+        if (this.sniffer != null) {
+            this.sniffer.close();
+            this.sniffer = null;
+        }
+
         if (this.client != null) {
             this.client.close();
             this.client = null;
@@ -194,6 +252,8 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
                 .verificationStepName(VERIFICATION_STEP_CONNECTION);
         final ConfigVerificationResult.Builder warningsResult = new 
ConfigVerificationResult.Builder()
                 .verificationStepName(VERIFICATION_STEP_WARNINGS);
+        final ConfigVerificationResult.Builder snifferResult = new 
ConfigVerificationResult.Builder()
+                .verificationStepName(VERIFICATION_STEP_SNIFFER);
 
         // configure the Rest Client
         try (final RestClient verifyClient = setupClient(context)) {
@@ -201,7 +261,10 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
 
             // try to fetch the Elasticsearch root endpoint (system summary)
             verifyRootConnection(verifyClient, connectionResult, 
warningsResult);
-        }catch (final MalformedURLException mue) {
+
+            // try sniffing for cluster nodes
+            verifySniffer(context, verifyClient, snifferResult);
+        } catch (final MalformedURLException mue) {
             clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
                     .explanation("Incorrect/invalid " + 
ElasticSearchClientService.HTTP_HOSTS.getDisplayName());
         } catch (final InitializationException ie) {
@@ -219,21 +282,78 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
                         .explanation("Elasticsearch Rest Client not 
configured");
                 
warningsResult.outcome(ConfigVerificationResult.Outcome.SKIPPED)
                         .explanation("Elasticsearch Rest Client not 
configured");
+                snifferResult.outcome(ConfigVerificationResult.Outcome.SKIPPED)
+                        .explanation("Elasticsearch Rest Client not 
configured");
             }
 
             results.add(clientSetup);
             results.add(connectionResult.build());
             results.add(warningsResult.build());
+            results.add(snifferResult.build());
         }
 
         return results;
     }
 
+    private void verifySniffer(final ConfigurationContext context, final 
RestClient verifyClient, final ConfigVerificationResult.Builder snifferResult) {
+        try (final Sniffer verifySniffer = setupSniffer(context, 
verifyClient)) {
+            if (verifySniffer != null) {
+                final List<Node> originalNodes = verifyClient.getNodes();
+                // cannot access the NodesSniffer from the parent Sniffer, so 
set up a second instance here
+                final ElasticsearchNodesSniffer elasticsearchNodesSniffer = 
setupElasticsearchNodesSniffer(context, verifyClient);
+                final List<Node> nodes = elasticsearchNodesSniffer.sniff();
+
+                // attempt to connect to each Elasticsearch Node using the 
RestClient
+                final AtomicInteger successfulInstances = new AtomicInteger(0);
+                final AtomicInteger warningInstances = new AtomicInteger(0);
+                nodes.forEach(n -> {
+                    try {
+                        verifyClient.setNodes(Collections.singletonList(n));
+                        final List<String> warnings = 
getElasticsearchRoot(verifyClient);
+                        successfulInstances.getAndIncrement();
+                        if (!warnings.isEmpty()) {
+                            warningInstances.getAndIncrement();
+                        }
+                    } catch (final Exception ex) {
+                        getLogger().warn("Elasticsearch Node {} connection 
failed", n.getHost().toURI(), ex);
+                    }
+                });
+                // reset Nodes list on RestClient to pre-Sniffer state (match 
user's Verify settings)
+                verifyClient.setNodes(originalNodes);
+
+                if (successfulInstances.get() < nodes.size()) {
+                    
snifferResult.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(
+                            String.format("Sniffing for Elasticsearch cluster 
nodes found %d nodes but %d could not be contacted (%d with warnings during 
connection tests)",
+                                    nodes.size(), nodes.size() - 
successfulInstances.get(), warningInstances.get())
+                    );
+                } else {
+                    
snifferResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(
+                            String.format("Sniffing for Elasticsearch cluster 
nodes found %d nodes (%d with warnings during connection tests)",
+                                    nodes.size(), warningInstances.get())
+                    );
+                }
+            } else {
+                
snifferResult.outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Sniff
 on Connection not enabled");
+            }
+        } catch (final Exception ex) {
+            getLogger().warn("Unable to sniff for Elasticsearch cluster 
nodes", ex);
+
+            snifferResult.outcome(ConfigVerificationResult.Outcome.FAILED)
+                    .explanation("Sniffing for Elasticsearch cluster nodes 
failed");
+        }
+    }
+
+    private List<String> getElasticsearchRoot(final RestClient verifyClient) 
throws IOException {
+        final Response response = verifyClient.performRequest(new 
Request("GET", "/"));
+        final List<String> warnings = parseResponseWarningHeaders(response);
+        parseResponse(response);
+
+        return warnings;
+    }
+
     private void verifyRootConnection(final RestClient verifyClient, final 
ConfigVerificationResult.Builder connectionResult, final 
ConfigVerificationResult.Builder warningsResult) {
         try {
-            final Response response = verifyClient.performRequest(new 
Request("GET", "/"));
-            final List<String> warnings = 
parseResponseWarningHeaders(response);
-            parseResponse(response);
+            final List<String> warnings = getElasticsearchRoot(verifyClient);
 
             
connectionResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
             if (warnings.isEmpty()) {
@@ -253,80 +373,121 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
     }
 
     private RestClient setupClient(final ConfigurationContext context) throws 
MalformedURLException, InitializationException {
-        final AuthorizationScheme authorizationScheme = 
AuthorizationScheme.valueOf(context.getProperty(AUTHORIZATION_SCHEME).getValue());
+        final Integer connectTimeout = 
context.getProperty(CONNECT_TIMEOUT).asInteger();
+        final Integer socketTimeout = 
context.getProperty(SOCKET_TIMEOUT).asInteger();
+
+        final NodeSelector nodeSelector = 
NODE_SELECTOR_ANY.getValue().equals(context.getProperty(NODE_SELECTOR).getValue())
+                ? NodeSelector.ANY
+                : NodeSelector.SKIP_DEDICATED_MASTERS;
+        final String pathPrefix = context.getProperty(PATH_PREFIX).getValue();
+
+        final boolean compress = context.getProperty(COMPRESSION).asBoolean();
+        final boolean sendMetaHeader = 
context.getProperty(SEND_META_HEADER).asBoolean();
+        final boolean strictDeprecation = 
context.getProperty(STRICT_DEPRECATION).asBoolean();
+        final boolean sniffOnFailure = 
context.getProperty(SNIFF_ON_FAILURE).asBoolean();
+
+        final RestClientBuilder builder = 
RestClient.builder(getHttpHosts(context));
+        addAuthAndProxy(context, builder)
+                .setRequestConfigCallback(requestConfigBuilder -> {
+                    requestConfigBuilder.setConnectTimeout(connectTimeout);
+                    requestConfigBuilder.setSocketTimeout(socketTimeout);
+                    return requestConfigBuilder;
+                })
+                .setCompressionEnabled(compress)
+                .setMetaHeaderEnabled(sendMetaHeader)
+                .setStrictDeprecationMode(strictDeprecation)
+                .setNodeSelector(nodeSelector);
+
+        if (sniffOnFailure && sniffer != null) {
+            final SniffOnFailureListener sniffOnFailureListener = new 
SniffOnFailureListener();
+            sniffOnFailureListener.setSniffer(sniffer);
+            builder.setFailureListener(sniffOnFailureListener);
+        }
+
+        if (StringUtils.isNotBlank(pathPrefix)) {
+            builder.setPathPrefix(pathPrefix);
+        }
 
+        return builder.build();
+    }
+
+    private HttpHost[] getHttpHosts(final ConfigurationContext context) throws 
MalformedURLException {
         final String hosts = 
context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
-        final String[] hostsSplit = hosts.split(",\\s*");
-        this.url = hostsSplit[0];
-        final SSLContextService sslService =
-                
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+        final List<String> hostsSplit = 
Arrays.stream(hosts.split(",\\s*")).map(String::trim).collect(Collectors.toList());
+        this.url = hostsSplit.get(0);
+        final List<HttpHost> hh = new ArrayList<>(hostsSplit.size());
+        for (final String host : hostsSplit) {
+            final URL u = new URL(host);
+            hh.add(new HttpHost(u.getHost(), u.getPort(), u.getProtocol()));
+        }
+
+        return hh.toArray(new HttpHost[0]);
+    }
+
+    private RestClientBuilder addAuthAndProxy(final ConfigurationContext 
context, final RestClientBuilder builder) throws InitializationException {
+        final AuthorizationScheme authorizationScheme = 
AuthorizationScheme.valueOf(context.getProperty(AUTHORIZATION_SCHEME).getValue());
+
         final String username = 
context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
         final String password = 
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
 
         final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
         final String apiKey = context.getProperty(API_KEY).getValue();
 
-        final Integer connectTimeout = 
context.getProperty(CONNECT_TIMEOUT).asInteger();
-        final Integer socketTimeout = 
context.getProperty(SOCKET_TIMEOUT).asInteger();
-
+        final SSLContext sslContext = getSSLContext(context);
         final ProxyConfigurationService proxyConfigurationService = 
context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
 
-        final HttpHost[] hh = new HttpHost[hostsSplit.length];
-        for (int x = 0; x < hh.length; x++) {
-            final URL u = new URL(hostsSplit[x]);
-            hh[x] = new HttpHost(u.getHost(), u.getPort(), u.getProtocol());
-        }
-
-        final SSLContext sslContext;
-        try {
-            sslContext = (sslService != null && 
(sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
-                    ? sslService.createContext() : null;
-        } catch (final Exception e) {
-            getLogger().error("Error building up SSL Context from the supplied 
configuration.", e);
-            throw new InitializationException(e);
-        }
+        return builder.setHttpClientConfigCallback(httpClientBuilder -> {
+            if (sslContext != null) {
+                httpClientBuilder.setSSLContext(sslContext);
+            }
 
-        final RestClientBuilder builder = RestClient.builder(hh)
-                .setHttpClientConfigCallback(httpClientBuilder -> {
-                    if (sslContext != null) {
-                        httpClientBuilder.setSSLContext(sslContext);
-                    }
+            CredentialsProvider credentialsProvider = null;
+            if (AuthorizationScheme.BASIC == authorizationScheme && username 
!= null && password != null) {
+                credentialsProvider = addBasicAuthCredentials(null, 
AuthScope.ANY, username, password);
+            }
 
-                    CredentialsProvider credentialsProvider = null;
-                    if (AuthorizationScheme.BASIC == authorizationScheme && 
username != null && password != null) {
-                        credentialsProvider = addCredentials(null, 
AuthScope.ANY, username, password);
-                    }
+            final List<Header> defaultHeaders = 
getDefaultHeadersFromDynamicProperties(context);
+            if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId 
!= null && apiKey != null) {
+                defaultHeaders.add(createApiKeyAuthorizationHeader(apiKeyId, 
apiKey));
+            }
+            if (!defaultHeaders.isEmpty()) {
+                builder.setDefaultHeaders(defaultHeaders.toArray(new 
Header[0]));
+            }
 
-                    if (AuthorizationScheme.API_KEY == authorizationScheme && 
apiKeyId != null && apiKey != null) {
-                        
httpClientBuilder.setDefaultHeaders(Collections.singletonList(createApiKeyAuthorizationHeader(apiKeyId,
 apiKey)));
-                    }
+            if (proxyConfigurationService != null) {
+                final ProxyConfiguration proxyConfiguration = 
proxyConfigurationService.getConfiguration();
+                if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
+                    final HttpHost proxy = new 
HttpHost(proxyConfiguration.getProxyServerHost(), 
proxyConfiguration.getProxyServerPort(), "http");
+                    httpClientBuilder.setProxy(proxy);
 
-                    if (proxyConfigurationService != null) {
-                        final ProxyConfiguration proxyConfiguration = 
proxyConfigurationService.getConfiguration();
-                        if (Proxy.Type.HTTP == 
proxyConfiguration.getProxyType()) {
-                            final HttpHost proxy = new 
HttpHost(proxyConfiguration.getProxyServerHost(), 
proxyConfiguration.getProxyServerPort(), "http");
-                            httpClientBuilder.setProxy(proxy);
+                    credentialsProvider = 
addBasicAuthCredentials(credentialsProvider, new AuthScope(proxy), 
proxyConfiguration.getProxyUserName(), 
proxyConfiguration.getProxyUserPassword());
+                }
+            }
 
-                            credentialsProvider = 
addCredentials(credentialsProvider, new AuthScope(proxy), 
proxyConfiguration.getProxyUserName(), 
proxyConfiguration.getProxyUserPassword());
-                        }
-                    }
+            if (credentialsProvider != null) {
+                
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+            }
 
-                    if (credentialsProvider != null) {
-                        
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-                    }
+            return httpClientBuilder;
+        });
+    }
 
-                    return httpClientBuilder;
-                })
-                .setRequestConfigCallback(requestConfigBuilder -> {
-                    requestConfigBuilder.setConnectTimeout(connectTimeout);
-                    requestConfigBuilder.setSocketTimeout(socketTimeout);
-                    return requestConfigBuilder;
-                });
+    private SSLContext getSSLContext(final ConfigurationContext context) 
throws InitializationException {
+        final SSLContextService sslService =
+                
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
 
-        return builder.build();
+        try {
+            return (sslService != null && (sslService.isKeyStoreConfigured() 
|| sslService.isTrustStoreConfigured()))
+                    ? sslService.createContext() : null;
+        } catch (final Exception e) {
+            getLogger().error("Error building up SSL Context from the supplied 
configuration.", e);
+            throw new InitializationException(e);
+        }
     }
 
-    private CredentialsProvider addCredentials(final CredentialsProvider 
credentialsProvider, final AuthScope authScope, final String username, final 
String password) {
+    private CredentialsProvider addBasicAuthCredentials(final 
CredentialsProvider credentialsProvider, final AuthScope authScope,
+                                                        final String username, 
final String password) {
         final CredentialsProvider cp = credentialsProvider != null ? 
credentialsProvider : new BasicCredentialsProvider();
 
         if (StringUtils.isNotBlank(username) && 
StringUtils.isNotBlank(password)) {
@@ -339,12 +500,48 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         return cp;
     }
 
-    private BasicHeader createApiKeyAuthorizationHeader(String apiKeyId, 
String apiKey) {
+    private List<Header> getDefaultHeadersFromDynamicProperties(final 
ConfigurationContext context) {
+        return context.getProperties().entrySet().stream()
+                // filter non-null dynamic properties
+                .filter(e -> e.getKey().isDynamic() && 
StringUtils.isNotBlank(e.getValue())
+                        && 
StringUtils.isNotBlank(context.getProperty(e.getKey()).evaluateAttributeExpressions().getValue())
+                )
+                // convert to Headers
+                .map(e -> new BasicHeader(e.getKey().getName(),
+                        
context.getProperty(e.getKey()).evaluateAttributeExpressions().getValue()))
+                .collect(Collectors.toList());
+    }
+
+    private BasicHeader createApiKeyAuthorizationHeader(final String apiKeyId, 
final String apiKey) {
         final String apiKeyCredentials = String.format("%s:%s", apiKeyId, 
apiKey);
         final String apiKeyAuth = 
Base64.getEncoder().encodeToString((apiKeyCredentials).getBytes(StandardCharsets.UTF_8));
         return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
     }
 
+    private Sniffer setupSniffer(final ConfigurationContext context, final 
RestClient restClient) {
+        final boolean sniffClusterNodes = 
context.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
+        final int snifferIntervalMillis = 
context.getProperty(SNIFFER_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final int snifferFailureDelayMillis = 
context.getProperty(SNIFFER_FAILURE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+        if (sniffClusterNodes) {
+            return Sniffer.builder(restClient)
+                    .setSniffIntervalMillis(snifferIntervalMillis)
+                    .setSniffAfterFailureDelayMillis(snifferFailureDelayMillis)
+                    .setNodesSniffer(setupElasticsearchNodesSniffer(context, 
restClient))
+                    .build();
+        }
+
+        return null;
+    }
+
+    private ElasticsearchNodesSniffer setupElasticsearchNodesSniffer(final 
ConfigurationContext context, final RestClient restClient) {
+        final Long snifferRequestTimeoutMillis = 
context.getProperty(SNIFFER_REQUEST_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        final ElasticsearchNodesSniffer.Scheme scheme = 
this.url.toLowerCase(Locale.getDefault()).startsWith("https://";)
+                ? ElasticsearchNodesSniffer.Scheme.HTTPS : 
ElasticsearchNodesSniffer.Scheme.HTTP;
+
+        return new ElasticsearchNodesSniffer(restClient, 
snifferRequestTimeoutMillis, scheme);
+    }
+
     private void appendIndex(final StringBuilder sb, final String index) {
         if (StringUtils.isNotBlank(index) && !"/".equals(index)) {
             if (!index.startsWith("/")) {
@@ -798,10 +995,6 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         }
 
         if (getLogger().isDebugEnabled()) {
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            entity.writeTo(out);
-            out.close();
-
             StringBuilder builder = new StringBuilder(1000);
             builder.append("Dumping Elasticsearch REST request...\n")
                     .append("HTTP Method: ")
@@ -812,11 +1005,18 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
                     .append("\n")
                     .append("Parameters: ")
                     .append(prettyPrintWriter.writeValueAsString(parameters))
-                    .append("\n")
-                    .append("Request body: ")
-                    .append(new String(out.toByteArray()))
                     .append("\n");
 
+            if (entity != null) {
+                final ByteArrayOutputStream out = new ByteArrayOutputStream();
+                entity.writeTo(out);
+                out.close();
+
+                builder.append("Request body: ")
+                        .append(out)
+                        .append("\n");
+            }
+
             getLogger().debug(builder.toString());
         }
 
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html
new file mode 100644
index 0000000000..f0934cafe6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html
@@ -0,0 +1,54 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ElasticSearchClientServiceImpl</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+</head>
+
+<body>
+
+<h2>Sniffing</h2>
+<p>
+    The Elasticsearch Sniffer can be used to locate Elasticsearch Nodes within 
a Cluster to which you are connecting.
+    This can be beneficial if your cluster dynamically changes over time, e.g. 
new Nodes are added to maintain performance during heavy load.
+</p>
+<p>
+    Sniffing can also be used to update the list of Hosts within the Cluster 
if a connection Failure is encountered during operation.
+    In order to "Sniff on Failure", you <b>must</b> also enable "Sniff Cluster 
Nodes".
+</p>
+<p>
+    Not all situations make sense to use Sniffing, for example if:
+    <ul>
+        <li>Elasticsearch is situated behind a load balancer, which 
dynamically routes connections from NiFi</li>
+        <li>Elasticsearch is on a different network to NiFi</li>
+    </ul>
+</p>
+<p>
+    There may also be need to set some of the <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html";>
+    Elasticsearch Networking Advanced Settings</a>, such as 
<code>network.publish_host</code> to ensure that
+    the HTTP Hosts found by the Sniffer are accessible by NiFi. For example, 
Elasticsearch may use a network internal
+    <code>publish_host</code> that is inaccessible to NiFi, but instead should 
use an address/IP that NiFi understands.
+    It may also be necessary to add this same address to Elasticsearch's 
<code>network.bind_host</code> list.
+</p>
+<p>
+    See <a 
href="https://www.elastic.co/blog/elasticsearch-sniffing-best-practices-what-when-why-how";>
+    Elasticsearch sniffing best practices: What, when, why, how</a> for more 
details of the best practices.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java
index 9f99ec0da8..bf82c8853d 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java
@@ -20,7 +20,7 @@ package org.apache.nifi.elasticsearch;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,17 +31,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class SearchResponseTest {
     @Test
     void test() {
-        List<Map<String, Object>> results = new ArrayList<>();
-        Map<String, Object> aggs    = new HashMap<>();
-        String pitId = "pitId";
-        String scrollId = "scrollId";
-        String searchAfter = "searchAfter";
-        int num     = 10;
-        int took    = 100;
-        boolean timeout = false;
-        List<String> warnings = Arrays.asList("auth");
-        SearchResponse response = new SearchResponse(results, aggs, pitId, 
scrollId, searchAfter, num, took, timeout, warnings);
-        String str = response.toString();
+        final List<Map<String, Object>> results = new ArrayList<>();
+        final Map<String, Object> aggs    = new HashMap<>();
+        final String pitId = "pitId";
+        final String scrollId = "scrollId";
+        final String searchAfter = "searchAfter";
+        final int num     = 10;
+        final int took    = 100;
+        final boolean timeout = false;
+        final List<String> warnings = Collections.singletonList("auth");
+        final SearchResponse response = new SearchResponse(results, aggs, 
pitId, scrollId, searchAfter, num, took, timeout, warnings);
+        final String str = response.toString();
 
         assertEquals(results, response.getHits());
         assertEquals(aggs, response.getAggregations());
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java
index 4ee196a9dc..c85879179f 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java
@@ -28,10 +28,10 @@ import java.util.List;
 import java.util.Map;
 
 public class TestElasticSearchClientService extends AbstractControllerService 
implements ElasticSearchClientService {
-    private Map<String, Object> data;
+    private final Map<String, Object> data;
 
     public TestElasticSearchClientService() {
-        data = new HashMap<>();
+        data = new HashMap<>(4, 1);
         data.put("username", "john.smith");
         data.put("password", "testing1234");
         data.put("email", "[email protected]");
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java
index 813ce1c5d2..7868c5ccd5 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java
@@ -26,7 +26,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -36,12 +36,12 @@ import static 
org.apache.nifi.schema.access.SchemaField.SCHEMA_NAME;
 public class TestSchemaRegistry extends AbstractControllerService implements 
SchemaRegistry {
     @Override
     public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) {
-        List<RecordField> fields = Arrays.asList(new RecordField("msg", 
RecordFieldType.STRING.getDataType()));
+        List<RecordField> fields = Collections.singletonList(new 
RecordField("msg", RecordFieldType.STRING.getDataType()));
         return new SimpleRecordSchema(fields);
     }
 
     @Override
     public Set<SchemaField> getSuppliedSchemaFields() {
-        return new HashSet<>(Arrays.asList(SCHEMA_NAME));
+        return new HashSet<>(Collections.singletonList(SCHEMA_NAME));
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
index 104e504b23..c9dfc752e6 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
@@ -38,6 +38,8 @@ abstract class AbstractElasticsearch_IT extends 
AbstractElasticsearchITBase {
         runner = 
TestRunners.newTestRunner(TestControllerServiceProcessor.class);
         service = new ElasticSearchClientServiceImpl();
         runner.addControllerService(CLIENT_SERVICE_NAME, service);
+        runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, 
CLIENT_SERVICE_NAME);
+
         runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, 
elasticsearchHost);
         runner.setProperty(service, 
ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
         runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, 
"60000");
@@ -45,6 +47,15 @@ abstract class AbstractElasticsearch_IT extends 
AbstractElasticsearchITBase {
         runner.setProperty(service, 
ElasticSearchClientService.AUTHORIZATION_SCHEME, 
AuthorizationScheme.BASIC.getValue());
         runner.setProperty(service, ElasticSearchClientService.USERNAME, 
"elastic");
         runner.setProperty(service, ElasticSearchClientService.PASSWORD, 
ELASTIC_USER_PASSWORD);
+        runner.removeProperty(service, ElasticSearchClientService.API_KEY);
+        runner.removeProperty(service, ElasticSearchClientService.API_KEY_ID);
+        runner.setProperty(service, ElasticSearchClientService.COMPRESSION, 
"false");
+        runner.setProperty(service, 
ElasticSearchClientService.SEND_META_HEADER, "true");
+        runner.setProperty(service, 
ElasticSearchClientService.STRICT_DEPRECATION, "false");
+        runner.setProperty(service, 
ElasticSearchClientService.SNIFF_CLUSTER_NODES, "false");
+        runner.setProperty(service, 
ElasticSearchClientService.SNIFF_ON_FAILURE, "false");
+        runner.removeProperty(service, ElasticSearchClientService.PATH_PREFIX);
+        runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, 
ElasticSearchClientService.NODE_SELECTOR_ANY.getValue());
 
         runner.enableControllerService(service);
 
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index d7f0d39874..402363aece 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -18,7 +18,12 @@
 package org.apache.nifi.elasticsearch.integration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.compress.utils.IOUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.VerifiableControllerService;
@@ -31,7 +36,6 @@ import org.apache.nifi.elasticsearch.IndexOperationRequest;
 import org.apache.nifi.elasticsearch.IndexOperationResponse;
 import org.apache.nifi.elasticsearch.MapBuilder;
 import org.apache.nifi.elasticsearch.SearchResponse;
-import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
 import org.apache.nifi.elasticsearch.UpdateOperationResponse;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
@@ -39,11 +43,15 @@ import org.apache.nifi.ssl.StandardSSLContextService;
 import org.apache.nifi.util.MockConfigurationContext;
 import org.apache.nifi.util.MockControllerServiceLookup;
 import org.apache.nifi.util.MockVariableRegistry;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -82,13 +90,38 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
+        assertEquals(4, results.size());
         assertEquals(3, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
+        assertVerifySnifferSkipped(results);
+    }
+
+    @Test
+    void testVerifySniffer() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, 
ElasticSearchClientService.SNIFF_CLUSTER_NODES, "true");
+        runner.setProperty(service, 
ElasticSearchClientService.SNIFF_ON_FAILURE, "false");
+        runner.enableControllerService(service);
+        assertVerifySniffer();
+
+        runner.disableControllerService(service);
+        runner.setProperty(service, 
ElasticSearchClientService.SNIFF_ON_FAILURE, "true");
+        runner.enableControllerService(service);
+        assertVerifySniffer();
+    }
+
+    private void assertVerifySniffer() {
+        final List<ConfigVerificationResult> results = 
((VerifiableControllerService) service).verify(
+                new MockConfigurationContext(service, 
getClientServiceProperties(), 
runner.getProcessContext().getControllerServiceLookup(), new 
MockVariableRegistry()),
+                runner.getLogger(),
+                Collections.emptyMap()
+        );
+        assertEquals(4, results.size());
+        assertEquals(4, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
     }
 
     @Test
     void testVerifySuccessWithApiKeyAuth() throws IOException {
-        final Pair<String, String> apiKey = createApiKeyForIndex("*");
+        final Pair<String, String> apiKey = createApiKeyForIndex();
 
         runner.disableControllerService(service);
         runner.setProperty(service, 
ElasticSearchClientService.AUTHORIZATION_SCHEME, 
AuthorizationScheme.API_KEY.getValue());
@@ -103,8 +136,9 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
+        assertEquals(4, results.size());
         assertEquals(3, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
+        assertVerifySnifferSkipped(results);
     }
 
     @Test
@@ -117,8 +151,8 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
-        assertEquals(2, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+        assertEquals(4, results.size());
+        assertEquals(3, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
         assertEquals(1, results.stream().filter(
                 result -> Objects.equals(result.getVerificationStepName(), 
ElasticSearchClientServiceImpl.VERIFICATION_STEP_CLIENT_SETUP)
                         && Objects.equals(result.getExplanation(), 
"Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName())
@@ -146,8 +180,8 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
-        assertEquals(2, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+        assertEquals(4, results.size());
+        assertEquals(3, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
         assertEquals(1, results.stream().filter(
                 result -> Objects.equals(result.getVerificationStepName(), 
ElasticSearchClientServiceImpl.VERIFICATION_STEP_CLIENT_SETUP)
                         && Objects.equals(result.getExplanation(), 
"Incorrect/invalid " + 
ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE.getDisplayName())
@@ -167,9 +201,9 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
+        assertEquals(4, results.size());
         assertEquals(1, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
-        assertEquals(1, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+        assertEquals(2, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
         assertEquals(1, results.stream().filter(
                 result -> Objects.equals(result.getVerificationStepName(), 
ElasticSearchClientServiceImpl.VERIFICATION_STEP_CONNECTION)
                         && Objects.equals(result.getExplanation(), "Unable to 
retrieve system summary from Elasticsearch root endpoint")
@@ -193,9 +227,9 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
+        assertEquals(4, results.size());
         assertEquals(1, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
-        assertEquals(1, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+        assertEquals(2, results.stream().filter(result -> result.getOutcome() 
== ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
         assertEquals(1, results.stream().filter(
                 result -> Objects.equals(result.getVerificationStepName(), 
ElasticSearchClientServiceImpl.VERIFICATION_STEP_CONNECTION)
                         && Objects.equals(result.getExplanation(), "Unable to 
retrieve system summary from Elasticsearch root endpoint")
@@ -623,6 +657,71 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
         assertFalse(service.exists("index-does-not-exist", null), "index 
exists");
     }
 
+    @Test
+    void testCompression() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, ElasticSearchClientService.COMPRESSION, 
"true");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
+    @Test
+    void testNoMetaHeader() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, 
ElasticSearchClientService.SEND_META_HEADER, "false");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
+    @Test
+    void testStrictDeprecation() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, 
ElasticSearchClientService.STRICT_DEPRECATION, "true");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
+    @Test
+    void testNodeSelector() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, 
ElasticSearchClientService.NODE_SELECTOR_SKIP_DEDICATED_MASTERS.getValue());
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
+    @Test
+    void testRestClientRequestHeaders() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, "User-Agent", "NiFi Integration Tests");
+        runner.setProperty(service, "X-Extra_header", "Request should still 
work");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
+    @Test
+    void testSniffer() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, 
ElasticSearchClientService.SNIFF_CLUSTER_NODES, "false");
+        runner.setProperty(service, 
ElasticSearchClientService.SNIFF_ON_FAILURE, "true");
+        runner.assertNotValid(service);
+
+        runner.setProperty(service, 
ElasticSearchClientService.SNIFF_CLUSTER_NODES, "true");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
     @Test
     void testNullSuppression() throws InterruptedException {
         final Map<String, Object> doc = new HashMap<>();
@@ -659,13 +758,12 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
     }
 
     private void suppressNulls(final boolean suppressNulls) {
-        runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, 
"Client Service");
         runner.disableControllerService(service);
         runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, 
suppressNulls
                 ? ElasticSearchClientService.ALWAYS_SUPPRESS.getValue()
                 : ElasticSearchClientService.NEVER_SUPPRESS.getValue());
         runner.enableControllerService(service);
-        runner.assertValid();
+        runner.assertValid(service);
     }
 
     @Test
@@ -823,4 +921,38 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
         Thread.sleep(1000);
     }
 
+    private void assertVerifySnifferSkipped(final 
List<ConfigVerificationResult> results) {
+        assertEquals(1, results.stream().filter(
+                        result -> 
Objects.equals(result.getVerificationStepName(), 
ElasticSearchClientServiceImpl.VERIFICATION_STEP_SNIFFER)
+                                && Objects.equals(result.getExplanation(), 
"Sniff on Connection not enabled")
+                                && result.getOutcome() == 
ConfigVerificationResult.Outcome.SKIPPED).count(),
+                results.toString()
+        );
+    }
+
+    protected Pair<String, String> createApiKeyForIndex() throws IOException {
+        final String body = prettyJson(new MapBuilder()
+                .of("name", "test-api-key")
+                .of("role_descriptors", new MapBuilder()
+                        .of("test-role", new MapBuilder()
+                                .of("cluster", 
Collections.singletonList("all"))
+                                .of("index", Collections.singletonList(new 
MapBuilder()
+                                        .of("names", 
Collections.singletonList("*"))
+                                        .of("privileges", 
Collections.singletonList("all"))
+                                        .build()))
+                                .build())
+                        .build())
+                .build());
+        final String endpoint = String.format("%s/%s", elasticsearchHost, 
"_security/api_key");
+        final Request request = new Request("POST", endpoint);
+        final HttpEntity jsonBody = new NStringEntity(body, 
ContentType.APPLICATION_JSON);
+        request.setEntity(jsonBody);
+
+        final Response response = 
testDataManagementClient.performRequest(request);
+        final InputStream inputStream = response.getEntity().getContent();
+        final byte[] result = IOUtils.toByteArray(inputStream);
+        inputStream.close();
+        final Map<String, String> ret = MAPPER.readValue(new String(result, 
StandardCharsets.UTF_8), new TypeReference<Map<String, String>>() {});
+        return Pair.of(ret.get("id"), ret.get("api_key"));
+    }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 7df9de2f07..c839a3f090 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -18,18 +18,18 @@ package org.apache.nifi.elasticsearch.integration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
+import com.github.dockerjava.api.model.ExposedPort;
+import com.github.dockerjava.api.model.HostConfig;
+import com.github.dockerjava.api.model.PortBinding;
+import com.github.dockerjava.api.model.Ports;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.nio.entity.NStringEntity;
-import org.apache.nifi.elasticsearch.MapBuilder;
 import org.apache.nifi.util.TestRunner;
 import org.elasticsearch.client.Request;
-import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.junit.jupiter.api.BeforeAll;
@@ -39,30 +39,38 @@ import org.testcontainers.utility.DockerImageName;
 
 import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URL;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 import static org.apache.http.auth.AuthScope.ANY;
 
 public abstract class AbstractElasticsearchITBase {
     // default Elasticsearch version should (ideally) match that in the 
nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
     protected static final DockerImageName IMAGE = DockerImageName
-            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.5.0"));
+            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.6.1"));
     protected static final String ELASTIC_USER_PASSWORD = 
System.getProperty("elasticsearch.elastic_user.password", 
RandomStringUtils.randomAlphanumeric(10, 20));
+    private static final int PORT = 9200;
     protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = 
new ElasticsearchContainer(IMAGE)
             .withPassword(ELASTIC_USER_PASSWORD)
             .withEnv("xpack.security.enabled", "true")
             // enable API Keys for integration-tests (6.x & 7.x don't enable 
SSL and therefore API Keys by default, so use a trial license and explicitly 
enable API Keys)
             .withEnv("xpack.license.self_generated.type", "trial")
-            .withEnv("xpack.security.authc.api_key.enabled", "true");
+            .withEnv("xpack.security.authc.api_key.enabled", "true")
+            // use a "special address" to ensure the publish_host is in the 
bind_host list, otherwise the Sniffer won't work
+            // 
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#network-interface-values
+            // TestContainers makes Elasticsearch available via 
localhost/127.0.0.1; Elasticsearch uses the IP Address in publish_host
+            .withEnv("network.bind_host", "_local_,_site_")
+            .withEnv("network.publish_host", "127.0.0.1")
+            // pin the Elasticsearch port (typically 9200 but not guaranteed), 
also bind that to 9200 on the host so the network.publish_host is accessible
+            .withEnv("http.port", String.valueOf(PORT))
+            .withExposedPorts(PORT)
+            .withCreateContainerCmdModifier(cmd -> cmd.withHostConfig(
+                    new HostConfig().withPortBindings(new 
PortBinding(Ports.Binding.bindPort(PORT), new ExposedPort(PORT)))
+            ));
     protected static final String CLIENT_SERVICE_NAME = "Client Service";
     protected static final String INDEX = "messages";
 
@@ -88,7 +96,7 @@ public abstract class AbstractElasticsearchITBase {
 
     protected static String type;
 
-    private static RestClient testDataManagementClient;
+    static RestClient testDataManagementClient;
 
     protected static void stopTestcontainer() {
         if (ENABLE_TEST_CONTAINERS) {
@@ -98,7 +106,6 @@ public abstract class AbstractElasticsearchITBase {
 
     @BeforeAll
     static void beforeAll() throws IOException {
-
         startTestcontainer();
         type = getElasticMajorVersion() == 6 ? "_doc" : "";
         System.out.printf("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nIMAGE: 
%s:%s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n",
@@ -173,32 +180,6 @@ public abstract class AbstractElasticsearchITBase {
         return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
     }
 
-    protected Pair<String, String> createApiKeyForIndex(String index) throws 
IOException {
-        final String body = prettyJson(new MapBuilder()
-                .of("name", "test-api-key")
-                .of("role_descriptors", new MapBuilder()
-                        .of("test-role", new MapBuilder()
-                                .of("cluster", 
Collections.singletonList("all"))
-                                .of("index", Collections.singletonList(new 
MapBuilder()
-                                        .of("names", 
Collections.singletonList(index))
-                                        .of("privileges", 
Collections.singletonList("all"))
-                                        .build()))
-                                .build())
-                        .build())
-                .build());
-        final String endpoint = String.format("%s/%s", elasticsearchHost, 
"_security/api_key");
-        final Request request = new Request("POST", endpoint);
-        final HttpEntity jsonBody = new NStringEntity(body, 
ContentType.APPLICATION_JSON);
-        request.setEntity(jsonBody);
-
-        final Response response = 
testDataManagementClient.performRequest(request);
-        final InputStream inputStream = response.getEntity().getContent();
-        final byte[] result = IOUtils.toByteArray(inputStream);
-        inputStream.close();
-        final Map<String, String> ret = MAPPER.readValue(new String(result, 
StandardCharsets.UTF_8), Map.class);
-        return Pair.of(ret.get("id"), ret.get("api_key"));
-    }
-
     private static List<SetupAction> readSetupActions(final String scriptPath) 
throws IOException {
         final List<SetupAction> actions = new ArrayList<>();
         try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(Files.newInputStream(Paths.get(scriptPath))))) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index 128260533f..f906a9a4cd 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -31,6 +31,17 @@ language governing permissions and limitations under the 
License. -->
         <module>nifi-elasticsearch-restapi-processors</module>
     </modules>
 
+    <properties>
+        <!-- pinned at 7.13.4 as it is the last version prior to Elastic 
forcing the client to check it is connecting
+         to an Elastic-provided Elasticsearch instead of an instance provided 
by someone else (e.g. AWS OpenSearch)
+         see: https://opensearch.org/blog/community/2021/08/community-clients/ 
for more info.
+
+         Note: the low-level elasticsearch-rest-client remains licensed with 
Apache 2.0
+         
(https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_license.html)
 even after the move
+         of the main Elasticsearch product and 
elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
+        <elasticsearch.client.version>7.13.4</elasticsearch.client.version>
+    </properties>
+
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -53,14 +64,7 @@ language governing permissions and limitations under the 
License. -->
             <dependency>
                 <groupId>org.elasticsearch.client</groupId>
                 <artifactId>elasticsearch-rest-client</artifactId>
-                <!-- pinned at 7.13.4 as it is the last version prior to 
Elastic forcing the client to check it is connecting
-                 to an Elastic-provided Elasticsearch instead of an instance 
provided by someone else (e.g. AWS OpenSearch)
-                 see: 
https://opensearch.org/blog/community/2021/08/community-clients/ for more info.
-
-                 Note: the low-level elasticsearch-rest-client remains 
licensed with Apache 2.0
-                 
(https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_license.html)
 even after the move
-                 of the main Elasticsearch product and 
elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
-                <version>7.13.4</version>
+                <version>${elasticsearch.client.version}</version>
                 <scope>compile</scope>
                 <exclusions>
                     <exclusion>
@@ -73,6 +77,18 @@ language governing permissions and limitations under the 
License. -->
                     </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>org.elasticsearch.client</groupId>
+                <artifactId>elasticsearch-rest-client-sniffer</artifactId>
+                <version>${elasticsearch.client.version}</version>
+                <scope>compile</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>commons-logging</groupId>
+                        <artifactId>commons-logging</artifactId>
+                    </exclusion>
+                </exclusions>
+        </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -85,7 +101,7 @@ language governing permissions and limitations under the 
License. -->
             </activation>
             <properties>
                 <!-- also update the default Elasticsearch version in 
nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
-                <elasticsearch_docker_image>8.5.3</elasticsearch_docker_image>
+                <elasticsearch_docker_image>8.6.1</elasticsearch_docker_image>
                 
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
             </properties>
             <build>
@@ -116,7 +132,7 @@ language governing permissions and limitations under the 
License. -->
         <profile>
             <id>elasticsearch7</id>
             <properties>
-                <elasticsearch_docker_image>7.17.8</elasticsearch_docker_image>
+                <elasticsearch_docker_image>7.17.9</elasticsearch_docker_image>
             </properties>
         </profile>
     </profiles>

Reply via email to