This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f24a34640a NIFI-10831 Added JWT realm authentication for Elasticsearch 
(#9605)
f24a34640a is described below

commit f24a34640ac1860d90138d0309c9b61c976381ae
Author: Chris Sampson <[email protected]>
AuthorDate: Sat Feb 8 17:44:31 2025 +0000

    NIFI-10831 Added JWT realm authentication for Elasticsearch (#9605)
    
    - Updated ElasticsearchClientService property descriptor dependencies
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi-elasticsearch-client-service-api/pom.xml  |  6 +-
 .../nifi/elasticsearch/AuthorizationScheme.java    |  3 +-
 .../elasticsearch/ElasticSearchClientService.java  | 43 +++++++++--
 .../nifi-elasticsearch-client-service/pom.xml      | 16 +---
 .../ElasticSearchClientServiceImpl.java            | 86 +++++++++++++---------
 .../elasticsearch/ElasticSearchLookupService.java  | 10 +--
 .../integration/ElasticSearchClientService_IT.java |  2 +-
 .../integration/ElasticSearchLookupService_IT.java |  1 +
 .../unit/ElasticSearchClientServiceImplTest.java   | 36 ++++++---
 .../unit/ElasticSearchStringLookupServiceTest.java |  5 +-
 .../nifi-elasticsearch-restapi-processors/pom.xml  |  5 +-
 .../nifi-elasticsearch-test-utils/pom.xml          | 10 ---
 .../integration/AbstractElasticsearchITBase.java   |  2 +-
 .../nifi-elasticsearch-bundle/pom.xml              |  2 +-
 14 files changed, 137 insertions(+), 90 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
index d085c54a97..05684a4e94 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
@@ -38,7 +38,11 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
-            <scope>compile</scope>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
         </dependency>
     </dependencies>
 </project>
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
index c62e1f22d3..feb31b44c6 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
@@ -22,7 +22,8 @@ public enum AuthorizationScheme implements DescribedValue {
     NONE("None", "No authorization scheme."),
     PKI("PKI", "Mutual TLS with PKI certificate authorization scheme."),
     BASIC("Basic", "Basic authorization scheme."),
-    API_KEY("API Key", "API key authorization scheme.");
+    API_KEY("API Key", "API key authorization scheme."),
+    JWT("JWT", "JWT realm scheme.");
 
     private final String displayName;
     private final String description;
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 77e819332a..b1c64cc12d 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.Validator;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.VerifiableControllerService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.proxy.ProxySpec;
@@ -50,6 +51,7 @@ public interface ElasticSearchClientService extends 
ControllerService, Verifiabl
             .identifiesControllerService(SSLContextProvider.class)
             .addValidator(Validator.VALID)
             .build();
+
     PropertyDescriptor PROXY_CONFIGURATION_SERVICE = 
ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP);
 
     PropertyDescriptor AUTHORIZATION_SCHEME = new PropertyDescriptor.Builder()
@@ -62,12 +64,41 @@ public interface ElasticSearchClientService extends 
ControllerService, Verifiabl
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("el-cs-oauth2-token-provider")
+            .displayName("OAuth2 Access Token Provider")
+            .description("The OAuth2 Access Token Provider used to provide 
JWTs for Bearer Token Authorization with Elasticsearch.")
+            .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT)
+            .required(true)
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .addValidator(Validator.VALID)
+            .build();
+
+    PropertyDescriptor JWT_SHARED_SECRET = new PropertyDescriptor.Builder()
+            .name("jwt-shared-secret")
+            .displayName("JWT Shared Secret")
+            .description("JWT realm Shared Secret.")
+            .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT)
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    PropertyDescriptor RUN_AS_USER = new PropertyDescriptor.Builder()
+            .name("el-cs-run-as-user")
+            .displayName("Run As User")
+            .description("The username to impersonate within Elasticsearch.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
             .name("el-cs-username")
             .displayName("Username")
             .description("The username to use with XPack security.")
             .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC)
-            .required(false)
+            .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
@@ -77,7 +108,7 @@ public interface ElasticSearchClientService extends 
ControllerService, Verifiabl
             .displayName("Password")
             .description("The password to use with XPack security.")
             .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC)
-            .required(false)
+            .required(true)
             .sensitive(true)
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -88,7 +119,7 @@ public interface ElasticSearchClientService extends 
ControllerService, Verifiabl
             .displayName("API Key ID")
             .description("Unique identifier of the API key.")
             .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY)
-            .required(false)
+            .required(true)
             .sensitive(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
@@ -98,7 +129,7 @@ public interface ElasticSearchClientService extends 
ControllerService, Verifiabl
             .displayName("API Key")
             .description("Encoded API key.")
             .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY)
-            .required(false)
+            .required(true)
             .sensitive(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
@@ -220,7 +251,7 @@ public interface ElasticSearchClientService extends 
ControllerService, Verifiabl
             .name("el-cs-sniff-failure")
             .displayName("Sniff on Failure")
             .description("Enable sniffing on failure, meaning that after each 
failure the Elasticsearch nodes list gets updated " +
-                    "straightaway rather than at the following ordinary 
sniffing round")
+                    "straight away rather than at the following ordinary 
sniffing round")
             .dependsOn(SNIFF_CLUSTER_NODES, "true")
             .allowableValues("true", "false")
             .defaultValue("false")
@@ -370,7 +401,7 @@ public interface ElasticSearchClientService extends 
ControllerService, Verifiabl
     /**
      * Perform a search using the JSON DSL.
      *
-     * @param query A JSON string reprensenting the query.
+     * @param query A JSON string representing the query.
      * @param index The index to target. Optional.
      * @param type The type to target. Optional. Will not be used in future 
versions of Elasticsearch.
      * @param requestParameters A collection of URL request parameters. 
Optional.
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index dc2ab871df..3eb6532e47 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -47,6 +47,10 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-proxy-configuration-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-elasticsearch-client-service-api</artifactId>
@@ -83,23 +87,11 @@
             <artifactId>jackson-annotations</artifactId>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-compress</artifactId>
-        </dependency>
         <dependency>
             <groupId>com.github.stephenc.findbugs</groupId>
             <artifactId>findbugs-annotations</artifactId>
             <version>1.3.9-1</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.opentest4j</groupId>
             <artifactId>opentest4j</artifactId>
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index b3e6f7a1f4..9bb3414733 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -45,6 +45,7 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.proxy.ProxyConfiguration;
@@ -56,6 +57,7 @@ import org.apache.nifi.util.StringUtils;
 import org.elasticsearch.client.Node;
 import org.elasticsearch.client.NodeSelector;
 import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
@@ -110,6 +112,9 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             PASSWORD,
             API_KEY_ID,
             API_KEY,
+            JWT_SHARED_SECRET,
+            OAUTH2_ACCESS_TOKEN_PROVIDER,
+            RUN_AS_USER,
             PROP_SSL_CONTEXT_SERVICE,
             PROXY_CONFIGURATION_SERVICE,
             CONNECT_TIMEOUT,
@@ -127,6 +132,8 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             SNIFFER_FAILURE_DELAY
     );
 
+    private OAuth2AccessTokenProvider oAuth2AccessTokenProvider;
+
     private RestClient client;
 
     private Sniffer sniffer;
@@ -157,12 +164,6 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
 
         final AuthorizationScheme authorizationScheme = 
validationContext.getProperty(AUTHORIZATION_SCHEME).asAllowableValue(AuthorizationScheme.class);
 
-        final boolean usernameSet = 
validationContext.getProperty(USERNAME).isSet();
-        final boolean passwordSet = 
validationContext.getProperty(PASSWORD).isSet();
-
-        final boolean apiKeyIdSet = 
validationContext.getProperty(API_KEY_ID).isSet();
-        final boolean apiKeySet = 
validationContext.getProperty(API_KEY).isSet();
-
         final SSLContextProvider sslContextProvider = 
validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
 
         if (authorizationScheme == AuthorizationScheme.PKI && 
(sslContextProvider == null)) {
@@ -173,18 +174,6 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             );
         }
 
-        if (usernameSet && !passwordSet) {
-            addAuthorizationPropertiesValidationIssue(results, USERNAME, 
PASSWORD);
-        } else if (passwordSet && !usernameSet) {
-            addAuthorizationPropertiesValidationIssue(results, PASSWORD, 
USERNAME);
-        }
-
-        if (apiKeyIdSet && !apiKeySet) {
-            addAuthorizationPropertiesValidationIssue(results, API_KEY_ID, 
API_KEY);
-        } else if (apiKeySet && !apiKeyIdSet) {
-            addAuthorizationPropertiesValidationIssue(results, API_KEY, 
API_KEY_ID);
-        }
-
         final boolean sniffClusterNodes = 
validationContext.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
         final boolean sniffOnFailure = 
validationContext.getProperty(SNIFF_ON_FAILURE).asBoolean();
         if (sniffOnFailure && !sniffClusterNodes) {
@@ -195,23 +184,17 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         return results;
     }
 
-    private void addAuthorizationPropertiesValidationIssue(final 
List<ValidationResult> results, final PropertyDescriptor presentProperty, final 
PropertyDescriptor missingProperty) {
-        results.add(new 
ValidationResult.Builder().subject(missingProperty.getName()).valid(false)
-                .explanation(String.format("if '%s' is then '%s' must be 
set.", presentProperty.getDisplayName(), missingProperty.getDisplayName()))
-                .build()
-        );
-    }
-
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) throws 
InitializationException {
         try {
             this.client = setupClient(context);
             this.sniffer = setupSniffer(context, this.client);
-            responseCharset = 
Charset.forName(context.getProperty(CHARSET).getValue());
+            this.responseCharset = 
Charset.forName(context.getProperty(CHARSET).getValue());
+
+            this.oAuth2AccessTokenProvider = 
context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
 
             // re-create the ObjectMapper in case the SUPPRESS_NULLS property 
has changed - the JsonInclude settings aren't dynamic
             createObjectMapper(context);
-
         } catch (final Exception ex) {
             getLogger().error("Could not initialize ElasticSearch client.", 
ex);
             throw new InitializationException(ex);
@@ -264,10 +247,11 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             
clientSetupResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
 
             // try to fetch the Elasticsearch root endpoint (system summary)
-            verifyRootConnection(verifyClient, connectionResult, 
warningsResult);
+            final OAuth2AccessTokenProvider tokenProvider = 
context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+            verifyRootConnection(verifyClient, tokenProvider, 
connectionResult, warningsResult);
 
             // try sniffing for cluster nodes
-            verifySniffer(context, verifyClient, snifferResult);
+            verifySniffer(context, verifyClient, tokenProvider, snifferResult);
         } catch (final MalformedURLException mue) {
             clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
                     .explanation("Incorrect/invalid " + 
ElasticSearchClientService.HTTP_HOSTS.getDisplayName());
@@ -303,7 +287,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         return results;
     }
 
-    private void verifySniffer(final ConfigurationContext context, final 
RestClient verifyClient, final ConfigVerificationResult.Builder snifferResult) {
+    private void verifySniffer(final ConfigurationContext context, final 
RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider, final 
ConfigVerificationResult.Builder snifferResult) {
         try (final Sniffer verifySniffer = setupSniffer(context, 
verifyClient)) {
             if (verifySniffer != null) {
                 final List<Node> originalNodes = verifyClient.getNodes();
@@ -317,7 +301,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
                 nodes.forEach(n -> {
                     try {
                         verifyClient.setNodes(Collections.singletonList(n));
-                        final List<String> warnings = 
getElasticsearchRoot(verifyClient);
+                        final List<String> warnings = 
getElasticsearchRoot(verifyClient, tokenProvider);
                         successfulInstances.getAndIncrement();
                         if (!warnings.isEmpty()) {
                             warningInstances.getAndIncrement();
@@ -351,17 +335,20 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         }
     }
 
-    private List<String> getElasticsearchRoot(final RestClient verifyClient) 
throws IOException {
-        final Response response = verifyClient.performRequest(new 
Request("GET", "/"));
+    private List<String> getElasticsearchRoot(final RestClient verifyClient, 
final OAuth2AccessTokenProvider tokenProvider) throws IOException {
+        final Request request = addJWTAuthorizationHeader(new Request("GET", 
"/"), tokenProvider);
+        final Response response = verifyClient.performRequest(request);
         final List<String> warnings = parseResponseWarningHeaders(response);
+        // ensure the response can be parsed without exception
         parseResponse(response);
 
         return warnings;
     }
 
-    private void verifyRootConnection(final RestClient verifyClient, final 
ConfigVerificationResult.Builder connectionResult, final 
ConfigVerificationResult.Builder warningsResult) {
+    private void verifyRootConnection(final RestClient verifyClient, final 
OAuth2AccessTokenProvider tokenProvider,
+                                      final ConfigVerificationResult.Builder 
connectionResult, final ConfigVerificationResult.Builder warningsResult) {
         try {
-            final List<String> warnings = getElasticsearchRoot(verifyClient);
+            final List<String> warnings = getElasticsearchRoot(verifyClient, 
tokenProvider);
 
             
connectionResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
             if (warnings.isEmpty()) {
@@ -439,9 +426,13 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         final String username = 
context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
         final String password = 
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
 
+        final String runAsUser = 
context.getProperty(RUN_AS_USER).evaluateAttributeExpressions().getValue();
+
         final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
         final String apiKey = context.getProperty(API_KEY).getValue();
 
+        final String jwtSharedSecret = 
context.getProperty(JWT_SHARED_SECRET).getValue();
+
         final SSLContext sslContext = getSSLContext(context);
         final ProxyConfigurationService proxyConfigurationService = 
context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
 
@@ -459,6 +450,12 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId 
!= null && apiKey != null) {
                 defaultHeaders.add(createApiKeyAuthorizationHeader(apiKeyId, 
apiKey));
             }
+            if (AuthorizationScheme.JWT == authorizationScheme && 
jwtSharedSecret != null) {
+                defaultHeaders.add(createSharedSecretHeader(jwtSharedSecret));
+            }
+            if (runAsUser != null) {
+                defaultHeaders.add(createRunAsUserHeader(runAsUser));
+            }
             if (!defaultHeaders.isEmpty()) {
                 builder.setDefaultHeaders(defaultHeaders.toArray(new 
Header[0]));
             }
@@ -524,6 +521,23 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
     }
 
+    private BasicHeader createSharedSecretHeader(final String jwtSharedSecret) 
{
+        return new BasicHeader("ES-Client-Authentication", "sharedsecret " + 
jwtSharedSecret);
+    }
+
+    private BasicHeader createRunAsUserHeader(final String runAsUser) {
+        return new BasicHeader("es-security-runas-user", runAsUser);
+    }
+
+    private Request addJWTAuthorizationHeader(final Request request, final 
OAuth2AccessTokenProvider tokenProvider) {
+        if (tokenProvider != null) {
+            final RequestOptions.Builder requestOptionsBuilder = 
RequestOptions.DEFAULT.toBuilder();
+            requestOptionsBuilder.addHeader("Authorization", "Bearer " + 
tokenProvider.getAccessDetails().getAccessToken());
+            request.setOptions(requestOptionsBuilder.build());
+        }
+        return request;
+    }
+
     private Sniffer setupSniffer(final ConfigurationContext context, final 
RestClient restClient) {
         final boolean sniffClusterNodes = 
context.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
         final int snifferIntervalMillis = 
context.getProperty(SNIFFER_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
@@ -1016,7 +1030,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
     }
 
     private Response performRequest(final String method, final String 
endpoint, final Map<String, String> parameters, final HttpEntity entity) throws 
IOException {
-        final Request request = new Request(method, endpoint);
+        final Request request = addJWTAuthorizationHeader(new Request(method, 
endpoint), oAuth2AccessTokenProvider);
         if (parameters != null && !parameters.isEmpty()) {
             request.addParameters(parameters);
         }
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
index 38b3997562..b9f0d7dd23 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
@@ -209,12 +209,12 @@ public class ElasticSearchLookupService extends 
JsonInferenceSchemaRegistryServi
             return null;
         }
 
-        final Map<String, Object> source = (Map<String, Object>) 
response.getHits().get(0).get("_source");
+        final Map<String, Object> source = (Map<String, Object>) 
response.getHits().getFirst().get("_source");
 
         final RecordSchema toUse = getSchema(context, source, null);
 
         Record record = new MapRecord(toUse, source);
-        if (recordPathMappings.size() > 0) {
+        if (!recordPathMappings.isEmpty()) {
             record = applyMappings(record, source);
         }
 
@@ -239,7 +239,7 @@ public class ElasticSearchLookupService extends 
JsonInferenceSchemaRegistryServi
                                     put(e.getKey(), e.getValue());
                                 }});
                             }
-                        }}).collect(Collectors.toList())
+                        }}).toList()
                 );
             }});
         }};
@@ -256,10 +256,10 @@ public class ElasticSearchLookupService extends 
JsonInferenceSchemaRegistryServi
             if (response.getNumberOfHits() == 0) {
                 return null;
             } else {
-                final Map<String, Object> source = (Map<String, Object>) 
response.getHits().get(0).get("_source");
+                final Map<String, Object> source = (Map<String, Object>) 
response.getHits().getFirst().get("_source");
                 final RecordSchema toUse = getSchema(context, source, null);
                 Record record = new MapRecord(toUse, source);
-                if (recordPathMappings.size() > 0) {
+                if (!recordPathMappings.isEmpty()) {
                     record = applyMappings(record, source);
                 }
 
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index 2e79220847..fe6c637acc 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -263,7 +263,7 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
                         "four", 4, "five", 5)
                 .build();
 
-        buckets.forEach( (aggRes) -> {
+        buckets.forEach(aggRes -> {
             final String key = (String) aggRes.get("key");
             final Integer docCount = (Integer) aggRes.get("doc_count");
             assertEquals(expected.get(key), docCount, String.format("%s did 
not match.", key));
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
index 8129e999d5..3fc0b5076f 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
@@ -49,6 +49,7 @@ class ElasticSearchLookupService_IT extends 
AbstractElasticsearch_IT {
 
     private ElasticSearchLookupService lookupService;
 
+    @Override
     @BeforeEach
     void before() throws Exception {
         super.before();
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
index 9fdbcee8cd..98fc5cd674 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
@@ -21,6 +21,7 @@ import org.apache.nifi.elasticsearch.AuthorizationScheme;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
 import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
 import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.ssl.SSLContextProvider;
 import org.apache.nifi.util.TestRunner;
@@ -72,14 +73,11 @@ class ElasticSearchClientServiceImplTest {
         runner.assertValid(service);
 
         runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
-        
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.USERNAME,
 ElasticSearchClientService.PASSWORD);
-
-        runner.removeProperty(service, ElasticSearchClientService.USERNAME);
-        runner.assertValid(service);
+        
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.PASSWORD);
 
         runner.setProperty(service, ElasticSearchClientService.PASSWORD, 
"password");
         runner.removeProperty(service, ElasticSearchClientService.USERNAME);
-        
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.PASSWORD,
 ElasticSearchClientService.USERNAME);
+        
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.USERNAME);
     }
 
     @Test
@@ -90,14 +88,11 @@ class ElasticSearchClientServiceImplTest {
         runner.assertValid(service);
 
         runner.removeProperty(service, ElasticSearchClientService.API_KEY_ID);
-        
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY,
 ElasticSearchClientService.API_KEY_ID);
-
-        runner.removeProperty(service, ElasticSearchClientService.API_KEY);
-        runner.assertValid(service);
+        
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY_ID);
 
         runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, 
"api-key-id");
         runner.removeProperty(service, ElasticSearchClientService.API_KEY);
-        
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY_ID,
 ElasticSearchClientService.API_KEY);
+        
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY);
     }
 
     @Test
@@ -114,9 +109,26 @@ class ElasticSearchClientServiceImplTest {
         assertPKIAuthorizationValidationErrorMessage();
     }
 
-    private void assertAuthorizationPropertyValidationErrorMessage(final 
PropertyDescriptor presentProperty, final PropertyDescriptor missingProperty) {
+    @Test
+    void testValidateJwtAuth() throws InitializationException {
+        runner.setProperty(service, 
ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.JWT);
+        runner.setProperty(service, 
ElasticSearchClientService.JWT_SHARED_SECRET, "jwt-shared-secret");
+        
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.OAUTH2_ACCESS_TOKEN_PROVIDER);
+
+        final OAuth2AccessTokenProvider oAuth2AccessTokenProvider = 
mock(OAuth2AccessTokenProvider.class);
+        
when(oAuth2AccessTokenProvider.getIdentifier()).thenReturn("oauth2-access-token-provider");
+        runner.addControllerService("oauth2-access-token-provider", 
oAuth2AccessTokenProvider);
+        runner.setProperty(service, 
ElasticSearchClientService.OAUTH2_ACCESS_TOKEN_PROVIDER, 
"oauth2-access-token-provider");
+        runner.assertValid(service);
+
+        runner.removeProperty(service, 
ElasticSearchClientService.JWT_SHARED_SECRET);
+        
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.JWT_SHARED_SECRET);
+    }
+
+    private void assertAuthorizationPropertyValidationErrorMessage(final 
PropertyDescriptor missingProperty) {
         final AssertionFailedError afe = 
assertThrows(AssertionFailedError.class, () -> runner.assertValid(service));
-        assertTrue(afe.getMessage().contains(String.format("if '%s' is then 
'%s' must be set.", presentProperty.getDisplayName(), 
missingProperty.getDisplayName())));
+        final String expectedMessage = String.format("%s is required", 
missingProperty.getDisplayName());
+        assertTrue(afe.getMessage().contains(expectedMessage), 
String.format("Validation error message \"%s\" does not contain \"%s\"", 
afe.getMessage(), expectedMessage));
     }
 
     private void assertPKIAuthorizationValidationErrorMessage() {
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
index 16971faabf..7e19c90ca8 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
@@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class ElasticSearchStringLookupServiceTest {
+class ElasticSearchStringLookupServiceTest {
     private ElasticSearchClientService mockClientService;
     private ElasticSearchStringLookupService lookupService;
 
@@ -56,8 +56,9 @@ public class ElasticSearchStringLookupServiceTest {
         runner.enableControllerService(lookupService);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
-    public void simpleLookupTest() throws Exception {
+    void simpleLookupTest() throws Exception {
         Map<String, Object> coordinates = new HashMap<>();
         coordinates.put(ElasticSearchStringLookupService.ID, "12345");
 
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
index ed7fda19e8..2a1b71a43a 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
@@ -72,8 +72,9 @@ language governing permissions and limitations under the 
License. -->
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
index 1635b30ac7..cf760f6a9e 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
@@ -21,16 +21,6 @@ language governing permissions and limitations under the 
License. -->
     <packaging>jar</packaging>
 
     <dependencies>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-            <scope>compile</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-            <scope>compile</scope>
-        </dependency>
         <dependency>
             <groupId>com.github.docker-java</groupId>
             <artifactId>docker-java-api</artifactId>
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index ae3db3cc07..163b8fad99 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -52,7 +52,7 @@ import static org.apache.http.auth.AuthScope.ANY;
 public abstract class AbstractElasticsearchITBase {
     // default Elasticsearch version should (ideally) match that in the 
nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
     protected static final DockerImageName IMAGE = DockerImageName
-            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.15.1"));
+            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.17.0"));
     protected static final String ELASTIC_USER_PASSWORD = 
System.getProperty("elasticsearch.elastic_user.password", 
RandomStringUtils.randomAlphanumeric(10, 20));
     private static final int PORT = 9200;
     protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = 
new ElasticsearchContainer(IMAGE)
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
index 2f5a552451..dd7f1c4153 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -114,7 +114,7 @@ language governing permissions and limitations under the 
License. -->
         <profile>
             <id>elasticsearch7</id>
             <properties>
-                
<elasticsearch_docker_image>7.17.23</elasticsearch_docker_image>
+                
<elasticsearch_docker_image>7.17.26</elasticsearch_docker_image>
             </properties>
         </profile>
     </profiles>


Reply via email to