This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 56d8879e91 NIFI-11107 In ConsumeIMAP and ConsumePOP3 added support for
OAUTH based authorization.
56d8879e91 is described below
commit 56d8879e91beb0c980ffe1df8148458b4e58fc25
Author: Tamas Palfy <[email protected]>
AuthorDate: Tue Jan 17 17:45:11 2023 +0100
NIFI-11107 In ConsumeIMAP and ConsumePOP3 added support for OAUTH based
authorization.
This closes #6900.
Reviewed-by: Nandor Soma Abonyi <[email protected]>
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi-email-processors/pom.xml | 5 ++
.../processors/email/AbstractEmailProcessor.java | 65 +++++++++++++++++++++-
2 files changed, 67 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
index b340d3bf8c..0a349e85a5 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
@@ -39,6 +39,11 @@
<artifactId>nifi-utils</artifactId>
<version>1.20.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ <version>1.20.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
diff --git
a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
index 19c64971fe..a8b2cf0dbc 100644
---
a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
+++
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
@@ -16,10 +16,13 @@
*/
package org.apache.nifi.processors.email;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -43,6 +46,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@@ -56,7 +60,17 @@ import java.util.concurrent.TimeUnit;
* @param <T> the type of {@link AbstractMailReceiver}.
*/
abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends
AbstractProcessor {
-
+ public static final AllowableValue PASSWORD_BASED_AUTHORIZATION_MODE = new
AllowableValue(
+ "password-based-authorization-mode",
+ "Use Password",
+ "Use password"
+ );
+
+ public static final AllowableValue OAUTH_AUTHORIZATION_MODE = new
AllowableValue(
+ "oauth-based-authorization-mode",
+ "Use OAuth2",
+ "Use OAuth2 to acquire access token"
+ );
public static final PropertyDescriptor HOST = new
PropertyDescriptor.Builder()
.name("host")
.displayName("Host Name")
@@ -73,6 +87,22 @@ abstract class AbstractEmailProcessor<T extends
AbstractMailReceiver> extends Ab
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
+ public static final PropertyDescriptor AUTHORIZATION_MODE = new
PropertyDescriptor.Builder()
+ .name("authorization-mode")
+ .displayName("Authorization Mode")
+ .description("How to authorize sending email on the user's
behalf.")
+ .required(true)
+ .allowableValues(PASSWORD_BASED_AUTHORIZATION_MODE,
OAUTH_AUTHORIZATION_MODE)
+ .defaultValue(PASSWORD_BASED_AUTHORIZATION_MODE.getValue())
+ .build();
+ public static final PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("oauth2-access-token-provider")
+ .displayName("OAuth2 Access Token Provider")
+ .description("OAuth2 service that can provide access tokens.")
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .dependsOn(AUTHORIZATION_MODE, OAUTH_AUTHORIZATION_MODE)
+ .required(true)
+ .build();
public static final PropertyDescriptor USER = new
PropertyDescriptor.Builder()
.name("user")
.displayName("User Name")
@@ -85,6 +115,7 @@ abstract class AbstractEmailProcessor<T extends
AbstractMailReceiver> extends Ab
.name("password")
.displayName("Password")
.description("Password used for authentication and authorization
with Email server.")
+ .dependsOn(AUTHORIZATION_MODE, PASSWORD_BASED_AUTHORIZATION_MODE)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -143,6 +174,8 @@ abstract class AbstractEmailProcessor<T extends
AbstractMailReceiver> extends Ab
static {
SHARED_DESCRIPTORS.add(HOST);
SHARED_DESCRIPTORS.add(PORT);
+ SHARED_DESCRIPTORS.add(AUTHORIZATION_MODE);
+ SHARED_DESCRIPTORS.add(OAUTH2_ACCESS_TOKEN_PROVIDER);
SHARED_DESCRIPTORS.add(USER);
SHARED_DESCRIPTORS.add(PASSWORD);
SHARED_DESCRIPTORS.add(FOLDER);
@@ -165,6 +198,21 @@ abstract class AbstractEmailProcessor<T extends
AbstractMailReceiver> extends Ab
private volatile boolean shouldSetDeleteFlag;
+ protected volatile Optional<OAuth2AccessTokenProvider>
oauth2AccessTokenProviderOptional;
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ if (context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).isSet()) {
+ OAuth2AccessTokenProvider oauth2AccessTokenProvider =
context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+ oauth2AccessTokenProvider.getAccessDetails();
+
+ oauth2AccessTokenProviderOptional =
Optional.of(oauth2AccessTokenProvider);
+ } else {
+ oauth2AccessTokenProviderOptional = Optional.empty();
+ }
+ }
+
@OnStopped
public void stop(ProcessContext processContext) {
this.flushRemainingMessages(processContext);
@@ -229,7 +277,13 @@ abstract class AbstractEmailProcessor<T extends
AbstractMailReceiver> extends Ab
String host =
processContext.getProperty(HOST).evaluateAttributeExpressions().getValue();
String port =
processContext.getProperty(PORT).evaluateAttributeExpressions().getValue();
String user =
processContext.getProperty(USER).evaluateAttributeExpressions().getValue();
- String password =
processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+ String password =
oauth2AccessTokenProviderOptional.map(oauth2AccessTokenProvider -> {
+ String accessToken =
oauth2AccessTokenProvider.getAccessDetails().getAccessToken();
+
+ return accessToken;
+
}).orElse(processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue());
+
String folder =
processContext.getProperty(FOLDER).evaluateAttributeExpressions().getValue();
StringBuilder urlBuilder = new StringBuilder();
@@ -304,9 +358,14 @@ abstract class AbstractEmailProcessor<T extends
AbstractMailReceiver> extends Ab
propertyDescriptorEntry.getValue());
}
}
- String propertyName = this.getProtocol(context).equals("pop3") ?
"mail.pop3.timeout" : "mail.imap.timeout";
+ String protocol = this.getProtocol(context);
+
+ String propertyName = protocol.equals("pop3") ? "mail.pop3.timeout" :
"mail.imap.timeout";
final String timeoutInMillis =
String.valueOf(context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS));
javaMailProperties.setProperty(propertyName, timeoutInMillis);
+
+ oauth2AccessTokenProviderOptional.ifPresent(oauth2AccessTokenProvider
-> javaMailProperties.put("mail." + protocol + ".auth.mechanisms", "XOAUTH2"));
+
return javaMailProperties;
}