This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f24a34640a NIFI-10831 Added JWT realm authentication for Elasticsearch
(#9605)
f24a34640a is described below
commit f24a34640ac1860d90138d0309c9b61c976381ae
Author: Chris Sampson <[email protected]>
AuthorDate: Sat Feb 8 17:44:31 2025 +0000
NIFI-10831 Added JWT realm authentication for Elasticsearch (#9605)
- Updated ElasticsearchClientService property descriptor dependencies
Signed-off-by: David Handermann <[email protected]>
---
.../nifi-elasticsearch-client-service-api/pom.xml | 6 +-
.../nifi/elasticsearch/AuthorizationScheme.java | 3 +-
.../elasticsearch/ElasticSearchClientService.java | 43 +++++++++--
.../nifi-elasticsearch-client-service/pom.xml | 16 +---
.../ElasticSearchClientServiceImpl.java | 86 +++++++++++++---------
.../elasticsearch/ElasticSearchLookupService.java | 10 +--
.../integration/ElasticSearchClientService_IT.java | 2 +-
.../integration/ElasticSearchLookupService_IT.java | 1 +
.../unit/ElasticSearchClientServiceImplTest.java | 36 ++++++---
.../unit/ElasticSearchStringLookupServiceTest.java | 5 +-
.../nifi-elasticsearch-restapi-processors/pom.xml | 5 +-
.../nifi-elasticsearch-test-utils/pom.xml | 10 ---
.../integration/AbstractElasticsearchITBase.java | 2 +-
.../nifi-elasticsearch-bundle/pom.xml | 2 +-
14 files changed, 137 insertions(+), 90 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
index d085c54a97..05684a4e94 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
@@ -38,7 +38,11 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <scope>compile</scope>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
</dependency>
</dependencies>
</project>
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
index c62e1f22d3..feb31b44c6 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
@@ -22,7 +22,8 @@ public enum AuthorizationScheme implements DescribedValue {
NONE("None", "No authorization scheme."),
PKI("PKI", "Mutual TLS with PKI certificate authorization scheme."),
BASIC("Basic", "Basic authorization scheme."),
- API_KEY("API Key", "API key authorization scheme.");
+ API_KEY("API Key", "API key authorization scheme."),
+ JWT("JWT", "JWT realm scheme.");
private final String displayName;
private final String description;
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 77e819332a..b1c64cc12d 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
@@ -50,6 +51,7 @@ public interface ElasticSearchClientService extends
ControllerService, Verifiabl
.identifiesControllerService(SSLContextProvider.class)
.addValidator(Validator.VALID)
.build();
+
PropertyDescriptor PROXY_CONFIGURATION_SERVICE =
ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP);
PropertyDescriptor AUTHORIZATION_SCHEME = new PropertyDescriptor.Builder()
@@ -62,12 +64,41 @@ public interface ElasticSearchClientService extends
ControllerService, Verifiabl
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("el-cs-oauth2-token-provider")
+ .displayName("OAuth2 Access Token Provider")
+ .description("The OAuth2 Access Token Provider used to provide
JWTs for Bearer Token Authorization with Elasticsearch.")
+ .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT)
+ .required(true)
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .addValidator(Validator.VALID)
+ .build();
+
+ PropertyDescriptor JWT_SHARED_SECRET = new PropertyDescriptor.Builder()
+ .name("jwt-shared-secret")
+ .displayName("JWT Shared Secret")
+ .description("JWT realm Shared Secret.")
+ .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT)
+ .required(true)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ PropertyDescriptor RUN_AS_USER = new PropertyDescriptor.Builder()
+ .name("el-cs-run-as-user")
+ .displayName("Run As User")
+ .description("The username to impersonate within Elasticsearch.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("el-cs-username")
.displayName("Username")
.description("The username to use with XPack security.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC)
- .required(false)
+ .required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -77,7 +108,7 @@ public interface ElasticSearchClientService extends
ControllerService, Verifiabl
.displayName("Password")
.description("The password to use with XPack security.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC)
- .required(false)
+ .required(true)
.sensitive(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -88,7 +119,7 @@ public interface ElasticSearchClientService extends
ControllerService, Verifiabl
.displayName("API Key ID")
.description("Unique identifier of the API key.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY)
- .required(false)
+ .required(true)
.sensitive(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -98,7 +129,7 @@ public interface ElasticSearchClientService extends
ControllerService, Verifiabl
.displayName("API Key")
.description("Encoded API key.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY)
- .required(false)
+ .required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -220,7 +251,7 @@ public interface ElasticSearchClientService extends
ControllerService, Verifiabl
.name("el-cs-sniff-failure")
.displayName("Sniff on Failure")
.description("Enable sniffing on failure, meaning that after each
failure the Elasticsearch nodes list gets updated " +
- "straightaway rather than at the following ordinary
sniffing round")
+ "straight away rather than at the following ordinary
sniffing round")
.dependsOn(SNIFF_CLUSTER_NODES, "true")
.allowableValues("true", "false")
.defaultValue("false")
@@ -370,7 +401,7 @@ public interface ElasticSearchClientService extends
ControllerService, Verifiabl
/**
* Perform a search using the JSON DSL.
*
- * @param query A JSON string reprensenting the query.
+ * @param query A JSON string representing the query.
* @param index The index to target. Optional.
* @param type The type to target. Optional. Will not be used in future
versions of Elasticsearch.
* @param requestParameters A collection of URL request parameters.
Optional.
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index dc2ab871df..3eb6532e47 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -47,6 +47,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-client-service-api</artifactId>
@@ -83,23 +87,11 @@
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </dependency>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
<version>1.3.9-1</version>
</dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
<dependency>
<groupId>org.opentest4j</groupId>
<artifactId>opentest4j</artifactId>
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index b3e6f7a1f4..9bb3414733 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -45,6 +45,7 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
@@ -56,6 +57,7 @@ import org.apache.nifi.util.StringUtils;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
@@ -110,6 +112,9 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
PASSWORD,
API_KEY_ID,
API_KEY,
+ JWT_SHARED_SECRET,
+ OAUTH2_ACCESS_TOKEN_PROVIDER,
+ RUN_AS_USER,
PROP_SSL_CONTEXT_SERVICE,
PROXY_CONFIGURATION_SERVICE,
CONNECT_TIMEOUT,
@@ -127,6 +132,8 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
SNIFFER_FAILURE_DELAY
);
+ private OAuth2AccessTokenProvider oAuth2AccessTokenProvider;
+
private RestClient client;
private Sniffer sniffer;
@@ -157,12 +164,6 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
final AuthorizationScheme authorizationScheme =
validationContext.getProperty(AUTHORIZATION_SCHEME).asAllowableValue(AuthorizationScheme.class);
- final boolean usernameSet =
validationContext.getProperty(USERNAME).isSet();
- final boolean passwordSet =
validationContext.getProperty(PASSWORD).isSet();
-
- final boolean apiKeyIdSet =
validationContext.getProperty(API_KEY_ID).isSet();
- final boolean apiKeySet =
validationContext.getProperty(API_KEY).isSet();
-
final SSLContextProvider sslContextProvider =
validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
if (authorizationScheme == AuthorizationScheme.PKI &&
(sslContextProvider == null)) {
@@ -173,18 +174,6 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
);
}
- if (usernameSet && !passwordSet) {
- addAuthorizationPropertiesValidationIssue(results, USERNAME,
PASSWORD);
- } else if (passwordSet && !usernameSet) {
- addAuthorizationPropertiesValidationIssue(results, PASSWORD,
USERNAME);
- }
-
- if (apiKeyIdSet && !apiKeySet) {
- addAuthorizationPropertiesValidationIssue(results, API_KEY_ID,
API_KEY);
- } else if (apiKeySet && !apiKeyIdSet) {
- addAuthorizationPropertiesValidationIssue(results, API_KEY,
API_KEY_ID);
- }
-
final boolean sniffClusterNodes =
validationContext.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
final boolean sniffOnFailure =
validationContext.getProperty(SNIFF_ON_FAILURE).asBoolean();
if (sniffOnFailure && !sniffClusterNodes) {
@@ -195,23 +184,17 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
return results;
}
- private void addAuthorizationPropertiesValidationIssue(final
List<ValidationResult> results, final PropertyDescriptor presentProperty, final
PropertyDescriptor missingProperty) {
- results.add(new
ValidationResult.Builder().subject(missingProperty.getName()).valid(false)
- .explanation(String.format("if '%s' is then '%s' must be
set.", presentProperty.getDisplayName(), missingProperty.getDisplayName()))
- .build()
- );
- }
-
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws
InitializationException {
try {
this.client = setupClient(context);
this.sniffer = setupSniffer(context, this.client);
- responseCharset =
Charset.forName(context.getProperty(CHARSET).getValue());
+ this.responseCharset =
Charset.forName(context.getProperty(CHARSET).getValue());
+
+ this.oAuth2AccessTokenProvider =
context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
// re-create the ObjectMapper in case the SUPPRESS_NULLS property
has changed - the JsonInclude settings aren't dynamic
createObjectMapper(context);
-
} catch (final Exception ex) {
getLogger().error("Could not initialize ElasticSearch client.",
ex);
throw new InitializationException(ex);
@@ -264,10 +247,11 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
clientSetupResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
// try to fetch the Elasticsearch root endpoint (system summary)
- verifyRootConnection(verifyClient, connectionResult,
warningsResult);
+ final OAuth2AccessTokenProvider tokenProvider =
context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ verifyRootConnection(verifyClient, tokenProvider,
connectionResult, warningsResult);
// try sniffing for cluster nodes
- verifySniffer(context, verifyClient, snifferResult);
+ verifySniffer(context, verifyClient, tokenProvider, snifferResult);
} catch (final MalformedURLException mue) {
clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
.explanation("Incorrect/invalid " +
ElasticSearchClientService.HTTP_HOSTS.getDisplayName());
@@ -303,7 +287,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
return results;
}
- private void verifySniffer(final ConfigurationContext context, final
RestClient verifyClient, final ConfigVerificationResult.Builder snifferResult) {
+ private void verifySniffer(final ConfigurationContext context, final
RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider, final
ConfigVerificationResult.Builder snifferResult) {
try (final Sniffer verifySniffer = setupSniffer(context,
verifyClient)) {
if (verifySniffer != null) {
final List<Node> originalNodes = verifyClient.getNodes();
@@ -317,7 +301,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
nodes.forEach(n -> {
try {
verifyClient.setNodes(Collections.singletonList(n));
- final List<String> warnings =
getElasticsearchRoot(verifyClient);
+ final List<String> warnings =
getElasticsearchRoot(verifyClient, tokenProvider);
successfulInstances.getAndIncrement();
if (!warnings.isEmpty()) {
warningInstances.getAndIncrement();
@@ -351,17 +335,20 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
}
}
- private List<String> getElasticsearchRoot(final RestClient verifyClient)
throws IOException {
- final Response response = verifyClient.performRequest(new
Request("GET", "/"));
+ private List<String> getElasticsearchRoot(final RestClient verifyClient,
final OAuth2AccessTokenProvider tokenProvider) throws IOException {
+ final Request request = addJWTAuthorizationHeader(new Request("GET",
"/"), tokenProvider);
+ final Response response = verifyClient.performRequest(request);
final List<String> warnings = parseResponseWarningHeaders(response);
+ // ensure the response can be parsed without exception
parseResponse(response);
return warnings;
}
- private void verifyRootConnection(final RestClient verifyClient, final
ConfigVerificationResult.Builder connectionResult, final
ConfigVerificationResult.Builder warningsResult) {
+ private void verifyRootConnection(final RestClient verifyClient, final
OAuth2AccessTokenProvider tokenProvider,
+ final ConfigVerificationResult.Builder
connectionResult, final ConfigVerificationResult.Builder warningsResult) {
try {
- final List<String> warnings = getElasticsearchRoot(verifyClient);
+ final List<String> warnings = getElasticsearchRoot(verifyClient,
tokenProvider);
connectionResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
if (warnings.isEmpty()) {
@@ -439,9 +426,13 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
final String username =
context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password =
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+ final String runAsUser =
context.getProperty(RUN_AS_USER).evaluateAttributeExpressions().getValue();
+
final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
final String apiKey = context.getProperty(API_KEY).getValue();
+ final String jwtSharedSecret =
context.getProperty(JWT_SHARED_SECRET).getValue();
+
final SSLContext sslContext = getSSLContext(context);
final ProxyConfigurationService proxyConfigurationService =
context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
@@ -459,6 +450,12 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId
!= null && apiKey != null) {
defaultHeaders.add(createApiKeyAuthorizationHeader(apiKeyId,
apiKey));
}
+ if (AuthorizationScheme.JWT == authorizationScheme &&
jwtSharedSecret != null) {
+ defaultHeaders.add(createSharedSecretHeader(jwtSharedSecret));
+ }
+ if (runAsUser != null) {
+ defaultHeaders.add(createRunAsUserHeader(runAsUser));
+ }
if (!defaultHeaders.isEmpty()) {
builder.setDefaultHeaders(defaultHeaders.toArray(new
Header[0]));
}
@@ -524,6 +521,23 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
}
+ private BasicHeader createSharedSecretHeader(final String jwtSharedSecret)
{
+ return new BasicHeader("ES-Client-Authentication", "sharedsecret " +
jwtSharedSecret);
+ }
+
+ private BasicHeader createRunAsUserHeader(final String runAsUser) {
+ return new BasicHeader("es-security-runas-user", runAsUser);
+ }
+
+ private Request addJWTAuthorizationHeader(final Request request, final
OAuth2AccessTokenProvider tokenProvider) {
+ if (tokenProvider != null) {
+ final RequestOptions.Builder requestOptionsBuilder =
RequestOptions.DEFAULT.toBuilder();
+ requestOptionsBuilder.addHeader("Authorization", "Bearer " +
tokenProvider.getAccessDetails().getAccessToken());
+ request.setOptions(requestOptionsBuilder.build());
+ }
+ return request;
+ }
+
private Sniffer setupSniffer(final ConfigurationContext context, final
RestClient restClient) {
final boolean sniffClusterNodes =
context.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
final int snifferIntervalMillis =
context.getProperty(SNIFFER_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
@@ -1016,7 +1030,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
}
private Response performRequest(final String method, final String
endpoint, final Map<String, String> parameters, final HttpEntity entity) throws
IOException {
- final Request request = new Request(method, endpoint);
+ final Request request = addJWTAuthorizationHeader(new Request(method,
endpoint), oAuth2AccessTokenProvider);
if (parameters != null && !parameters.isEmpty()) {
request.addParameters(parameters);
}
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
index 38b3997562..b9f0d7dd23 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
@@ -209,12 +209,12 @@ public class ElasticSearchLookupService extends
JsonInferenceSchemaRegistryServi
return null;
}
- final Map<String, Object> source = (Map<String, Object>)
response.getHits().get(0).get("_source");
+ final Map<String, Object> source = (Map<String, Object>)
response.getHits().getFirst().get("_source");
final RecordSchema toUse = getSchema(context, source, null);
Record record = new MapRecord(toUse, source);
- if (recordPathMappings.size() > 0) {
+ if (!recordPathMappings.isEmpty()) {
record = applyMappings(record, source);
}
@@ -239,7 +239,7 @@ public class ElasticSearchLookupService extends
JsonInferenceSchemaRegistryServi
put(e.getKey(), e.getValue());
}});
}
- }}).collect(Collectors.toList())
+ }}).toList()
);
}});
}};
@@ -256,10 +256,10 @@ public class ElasticSearchLookupService extends
JsonInferenceSchemaRegistryServi
if (response.getNumberOfHits() == 0) {
return null;
} else {
- final Map<String, Object> source = (Map<String, Object>)
response.getHits().get(0).get("_source");
+ final Map<String, Object> source = (Map<String, Object>)
response.getHits().getFirst().get("_source");
final RecordSchema toUse = getSchema(context, source, null);
Record record = new MapRecord(toUse, source);
- if (recordPathMappings.size() > 0) {
+ if (!recordPathMappings.isEmpty()) {
record = applyMappings(record, source);
}
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index 2e79220847..fe6c637acc 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -263,7 +263,7 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
"four", 4, "five", 5)
.build();
- buckets.forEach( (aggRes) -> {
+ buckets.forEach(aggRes -> {
final String key = (String) aggRes.get("key");
final Integer docCount = (Integer) aggRes.get("doc_count");
assertEquals(expected.get(key), docCount, String.format("%s did
not match.", key));
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
index 8129e999d5..3fc0b5076f 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
@@ -49,6 +49,7 @@ class ElasticSearchLookupService_IT extends
AbstractElasticsearch_IT {
private ElasticSearchLookupService lookupService;
+ @Override
@BeforeEach
void before() throws Exception {
super.before();
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
index 9fdbcee8cd..98fc5cd674 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
@@ -21,6 +21,7 @@ import org.apache.nifi.elasticsearch.AuthorizationScheme;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextProvider;
import org.apache.nifi.util.TestRunner;
@@ -72,14 +73,11 @@ class ElasticSearchClientServiceImplTest {
runner.assertValid(service);
runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
-
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.USERNAME,
ElasticSearchClientService.PASSWORD);
-
- runner.removeProperty(service, ElasticSearchClientService.USERNAME);
- runner.assertValid(service);
+
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.PASSWORD);
runner.setProperty(service, ElasticSearchClientService.PASSWORD,
"password");
runner.removeProperty(service, ElasticSearchClientService.USERNAME);
-
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.PASSWORD,
ElasticSearchClientService.USERNAME);
+
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.USERNAME);
}
@Test
@@ -90,14 +88,11 @@ class ElasticSearchClientServiceImplTest {
runner.assertValid(service);
runner.removeProperty(service, ElasticSearchClientService.API_KEY_ID);
-
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY,
ElasticSearchClientService.API_KEY_ID);
-
- runner.removeProperty(service, ElasticSearchClientService.API_KEY);
- runner.assertValid(service);
+
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY_ID);
runner.setProperty(service, ElasticSearchClientService.API_KEY_ID,
"api-key-id");
runner.removeProperty(service, ElasticSearchClientService.API_KEY);
-
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY_ID,
ElasticSearchClientService.API_KEY);
+
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY);
}
@Test
@@ -114,9 +109,26 @@ class ElasticSearchClientServiceImplTest {
assertPKIAuthorizationValidationErrorMessage();
}
- private void assertAuthorizationPropertyValidationErrorMessage(final
PropertyDescriptor presentProperty, final PropertyDescriptor missingProperty) {
+ @Test
+ void testValidateJwtAuth() throws InitializationException {
+ runner.setProperty(service,
ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.JWT);
+ runner.setProperty(service,
ElasticSearchClientService.JWT_SHARED_SECRET, "jwt-shared-secret");
+
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.OAUTH2_ACCESS_TOKEN_PROVIDER);
+
+ final OAuth2AccessTokenProvider oAuth2AccessTokenProvider =
mock(OAuth2AccessTokenProvider.class);
+
when(oAuth2AccessTokenProvider.getIdentifier()).thenReturn("oauth2-access-token-provider");
+ runner.addControllerService("oauth2-access-token-provider",
oAuth2AccessTokenProvider);
+ runner.setProperty(service,
ElasticSearchClientService.OAUTH2_ACCESS_TOKEN_PROVIDER,
"oauth2-access-token-provider");
+ runner.assertValid(service);
+
+ runner.removeProperty(service,
ElasticSearchClientService.JWT_SHARED_SECRET);
+
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.JWT_SHARED_SECRET);
+ }
+
+ private void assertAuthorizationPropertyValidationErrorMessage(final
PropertyDescriptor missingProperty) {
final AssertionFailedError afe =
assertThrows(AssertionFailedError.class, () -> runner.assertValid(service));
- assertTrue(afe.getMessage().contains(String.format("if '%s' is then
'%s' must be set.", presentProperty.getDisplayName(),
missingProperty.getDisplayName())));
+ final String expectedMessage = String.format("%s is required",
missingProperty.getDisplayName());
+ assertTrue(afe.getMessage().contains(expectedMessage),
String.format("Validation error message \"%s\" does not contain \"%s\"",
afe.getMessage(), expectedMessage));
}
private void assertPKIAuthorizationValidationErrorMessage() {
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
index 16971faabf..7e19c90ca8 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
@@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class ElasticSearchStringLookupServiceTest {
+class ElasticSearchStringLookupServiceTest {
private ElasticSearchClientService mockClientService;
private ElasticSearchStringLookupService lookupService;
@@ -56,8 +56,9 @@ public class ElasticSearchStringLookupServiceTest {
runner.enableControllerService(lookupService);
}
+ @SuppressWarnings("unchecked")
@Test
- public void simpleLookupTest() throws Exception {
+ void simpleLookupTest() throws Exception {
Map<String, Object> coordinates = new HashMap<>();
coordinates.put(ElasticSearchStringLookupService.ID, "12345");
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
index ed7fda19e8..2a1b71a43a 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
@@ -72,8 +72,9 @@ language governing permissions and limitations under the
License. -->
<scope>test</scope>
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
index 1635b30ac7..cf760f6a9e 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
@@ -21,16 +21,6 @@ language governing permissions and limitations under the
License. -->
<packaging>jar</packaging>
<dependencies>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <scope>compile</scope>
- </dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-api</artifactId>
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index ae3db3cc07..163b8fad99 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -52,7 +52,7 @@ import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the
nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
- .parse(System.getProperty("elasticsearch.docker.image",
"docker.elastic.co/elasticsearch/elasticsearch:8.15.1"));
+ .parse(System.getProperty("elasticsearch.docker.image",
"docker.elastic.co/elasticsearch/elasticsearch:8.17.0"));
protected static final String ELASTIC_USER_PASSWORD =
System.getProperty("elasticsearch.elastic_user.password",
RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
new ElasticsearchContainer(IMAGE)
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
index 2f5a552451..dd7f1c4153 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -114,7 +114,7 @@ language governing permissions and limitations under the
License. -->
<profile>
<id>elasticsearch7</id>
<properties>
-
<elasticsearch_docker_image>7.17.23</elasticsearch_docker_image>
+
<elasticsearch_docker_image>7.17.26</elasticsearch_docker_image>
</properties>
</profile>
</profiles>