[
https://issues.apache.org/jira/browse/NIFI-2519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416426#comment-15416426
]
ASF GitHub Bot commented on NIFI-2519:
--------------------------------------
Github user trixpan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/827#discussion_r74363458
--- Diff:
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
---
@@ -166,317 +108,158 @@
.identifiesControllerService(SSLContextService.class)
.build();
- public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
.name("CLIENT_AUTH")
.displayName("Client Auth")
.description("The client authentication policy to use for the
SSL Context. Only used if an SSL Context Service is provided.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.NONE.toString(),
SSLContextService.ClientAuth.REQUIRED.toString())
.build();
- @Override
- protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
- final List<ValidationResult> results = new ArrayList<>();
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All new messages will be routed as FlowFiles to
this relationship")
+ .build();
- final String clientAuth =
validationContext.getProperty(CLIENT_AUTH).getValue();
- final SSLContextService sslContextService =
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ private final static List<PropertyDescriptor> propertyDescriptors;
- if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
- results.add(new ValidationResult.Builder()
- .explanation("Client Auth must be provided when using
TLS/SSL")
- .valid(false).subject("Client Auth").build());
- }
+ private final static Set<Relationship> relationships;
- return results;
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.add(SMTP_PORT);
+ _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS);
+ _propertyDescriptors.add(SMTP_TIMEOUT);
+ _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE);
+ _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
+ _propertyDescriptors.add(CLIENT_AUTH);
+ propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(_relationships);
}
+ private volatile SMTPServer smtp;
- public static final Relationship REL_SUCCESS = new
Relationship.Builder()
- .name("success")
- .description("Extraction was successful")
- .build();
+ private volatile SmtpConsumer smtpConsumer;
- private Set<Relationship> relationships;
- private List<PropertyDescriptor> propertyDescriptors;
- private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages;
+ /**
+ *
+ */
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory
sessionFactory) throws ProcessException {
+ ProcessSession processSession = sessionFactory.createSession();
+ if (this.smtp == null) {
+ this.setupSmtpIfNecessary(context, processSession);
+ }
+
+ if (this.smtpConsumer.hasMessage()) {
+ try {
+ /*
+ * Will consume incoming message directly from the wire
and into
+ * FlowFile/Content repository before exiting. This
essentially
+ * limits any potential data loss by allowing SMTPServer
thread
+ * to actually commit NiFi session if all good. However in
the
+ * event of exception, such exception will be propagated
back to
+ * the email sender via "undeliverable message" allowing
such
+ * user to re-send the message
+ */
+ this.smtpConsumer.consumeUsing((inputDataStream) -> {
+ FlowFile flowFile = processSession.create();
+ AtomicInteger size = new AtomicInteger();
+ flowFile = processSession.write(flowFile, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ size.set(IOUtils.copy(inputDataStream, out));
+ }
+ });
+
processSession.getProvenanceReporter().receive(flowFile, "smtp://"
+ + ListenSMTP.this.smtp.getHostName() + ":" +
ListenSMTP.this.smtp.getPort() + "/");
+ processSession.transfer(flowFile, REL_SUCCESS);
+ processSession.commit();
+ return size.get();
+ });
+ } catch (Exception e) {
+ this.getLogger().error("Failed while listenning for
messages.", e);
+ processSession.rollback();
+ }
+ } else {
+ context.yield();
+ }
+ }
- private volatile SMTPServer server;
- private AtomicBoolean initialized = new AtomicBoolean(false);
- private AtomicBoolean stopping = new AtomicBoolean(false);
+ /**
+ *
+ */
+ @OnStopped
+ public void close() {
+ this.getLogger().info("Stopping SMTPServer");
+ this.smtp.stop();
+ this.smtp = null;
+ this.getLogger().info("SMTPServer stopped");
+ }
+ /**
+ *
+ */
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
+ /**
+ *
+ */
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- this.relationships = Collections.unmodifiableSet(relationships);
-
- final List<PropertyDescriptor> props = new ArrayList<>();
- props.add(SMTP_PORT);
- props.add(SMTP_HOSTNAME);
- props.add(SMTP_MAXIMUM_CONNECTIONS);
- props.add(SMTP_TIMEOUT);
- props.add(SMTP_MAXIMUM_MSG_SIZE);
- props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE);
- props.add(SSL_CONTEXT_SERVICE);
- props.add(CLIENT_AUTH);
- this.propertyDescriptors = Collections.unmodifiableList(props);
-
- }
-
- // Upon Schedule, reset the initialized state to false
- @OnScheduled
- public void onScheduled(ProcessContext context) {
- initialized.set(false);
- stopping.set(false);
- }
-
- protected synchronized void initializeSMTPServer(final ProcessContext
context) throws Exception {
-
- // check if we are already running or if it is stopping
- if (initialized.get() && server.isRunning() || stopping.get() ) {
- return;
- }
-
- incomingMessages = new
LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger());
-
- String clientAuth = null;
-
- // If an SSLContextService was provided then create an SSLContext
to pass down to the server
- SSLContext sslContext = null;
- final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- if (sslContextService != null) {
- clientAuth = context.getProperty(CLIENT_AUTH).getValue();
- sslContext =
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
+ /**
+ *
+ */
+ private synchronized void setupSmtpIfNecessary(ProcessContext context,
ProcessSession processSession) {
+ if (this.smtp == null) {
+ SmtpConsumer consumer = new SmtpConsumer();
+ SMTPServer smtpServer = this.createServerInstance(context,
consumer);
+ smtpServer.setSoftwareName("Apache NiFi");
+ smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger());
+
smtpServer.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
+
smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+
+ this.smtpConsumer = consumer;
+ this.smtp = smtpServer;
+ this.smtp.start();
}
+ }
- final SSLContext finalSslContext = sslContext;
-
- SMTPMessageHandlerFactory smtpMessageHandlerFactory = new
SMTPMessageHandlerFactory(incomingMessages, getLogger());
- final SMTPServer server = new
SMTPServer(smtpMessageHandlerFactory) {
-
+ /**
+ *
+ */
+ private SMTPServer createServerInstance(ProcessContext context,
SmtpConsumer consumer) {
+ SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ SMTPServer smtpServer = sslContextService == null ? new
SMTPServer(consumer) : new SMTPServer(consumer) {
@Override
public SSLSocket createSSLSocket(Socket socket) throws
IOException {
InetSocketAddress remoteAddress = (InetSocketAddress)
socket.getRemoteSocketAddress();
-
- SSLSocketFactory socketFactory =
finalSslContext.getSocketFactory();
-
- SSLSocket s = (SSLSocket)
(socketFactory.createSocket(socket, remoteAddress.getHostName(),
socket.getPort(), true));
-
- s.setUseClientMode(false);
-
-
- // For some reason the createSSLContext above is not
enough to enforce
- // client side auth
- // If client auth is required...
- if
(SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue()))
{
- s.setNeedClientAuth(true);
+ String clientAuth =
context.getProperty(CLIENT_AUTH).getValue();
+ SSLContext sslContext =
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
+ SSLSocketFactory socketFactory =
sslContext.getSocketFactory();
+ SSLSocket sslSocket = (SSLSocket)
(socketFactory.createSocket(socket,
remoteAddress.getHostName(),socket.getPort(), true));
+ sslSocket.setUseClientMode(false);
+
+ if
(SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) {
+ this.setRequireTLS(true);
+ sslSocket.setNeedClientAuth(true);
}
-
- return s;
+ return sslSocket;
}
};
-
- // Set some parameters to our server
- server.setSoftwareName("Apache NiFi");
-
-
- // Set the Server options based on properties
- server.setPort(context.getProperty(SMTP_PORT).asInteger());
- server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
-
server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
-
server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
-
server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
-
-
- // Check if TLS should be enabled
if (sslContextService != null) {
- server.setEnableTLS(true);
- } else {
- server.setHideTLS(true);
- }
-
- // Set TLS to required in case CLIENT_AUTH = required
- if
(SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue()))
{
- server.setRequireTLS(true);
- }
-
- this.server = server;
- server.start();
-
- getLogger().info("Server started and listening on port " +
server.getPort());
-
- initialized.set(true);
- stopping.set(false);
- }
-
- @OnUnscheduled
- public void startShutdown() throws Exception {
- if (server != null) {
- stopping.set(true);
- getLogger().info("Shutting down processor P{}", new
Object[]{server});
- server.stop();
- getLogger().info("Shut down {}", new Object[]{server});
- }
- }
-
- @OnStopped
- public void completeShutdown() throws Exception {
- if (server != null) {
- if (!server.isRunning() && stopping.get() ) {
- stopping.set(false);
- }
- getLogger().info("Completed shut down {}", new
Object[]{server});
- }
- }
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
-
- try {
- initializeSMTPServer(context);
- } catch (Exception e) {
- context.yield();
- throw new ProcessException("Failed to initialize the SMTP
server", e);
- }
-
- while (!incomingMessages.isEmpty()) {
- SmtpEvent message = incomingMessages.poll();
-
- if (message == null) {
- return;
- }
-
- synchronized (message) {
- if (resultCodeSetAndIsError(message)) {
- SMTPResultCode resultCode =
SMTPResultCode.fromCode(message.getReturnCode());
- getLogger().warn("Message failed before onTrigger
processing message was: " + resultCode.getLogMessage());
- continue;
- }
-
- try {
- FlowFile flowfile = session.create();
-
- if (message.getMessageData() != null) {
- flowfile = session.write(flowfile, out -> {
- InputStream inputStream =
message.getMessageData();
- byte [] buffer = new byte[1024];
-
- int rd;
- long totalBytesRead =0;
-
- while ((rd = inputStream.read(buffer, 0,
buffer.length)) != -1 ) {
- totalBytesRead += rd;
- if (totalBytesRead >
server.getMaxMessageSize() ) {
- message.setReturnCode(500);
- message.setProcessed();
- break;
- }
- out.write(buffer, 0, rd);
- }
- out.flush();
- });
- } else {
- getLogger().debug("Message body was null");
-
message.setReturnCode(SMTPResultCode.UNKNOWN_ERROR_CODE.getCode());
- message.setProcessed();
- }
-
- if (!message.getProcessed()) {
- HashMap<String, String> attributes = new
HashMap<>();
- // Gather message attributes
- attributes.put(SMTP_HELO, message.getHelo());
- attributes.put(SMTP_SRC_IP, message.getHelo());
- attributes.put(SMTP_FROM, message.getFrom());
- attributes.put(SMTP_TO, message.getTo());
-
- List<Map<String, String>> details =
message.getCertifcateDetails();
- int c = 0;
-
- // Add a selection of each X509 certificates to
the already gathered attributes
-
- for (Map<String, String> detail : details) {
- attributes.put("smtp.certificate." + c +
".serial", detail.getOrDefault("SerialNumber", null));
- attributes.put("smtp.certificate." + c +
".subjectName", detail.getOrDefault("SubjectName", null));
--- End diff --
It boils down to chain of custody.
Imagine Joe Blogs and Luo Cipher both work for ACME corp and are issued
certificated to send messages to my NiFi instance using TLS and requiring
client Auth.
Since both are authorised, you want to distinguish from your authorised
users who sent a particular email to NiFi, hence you store the subjectName used
to send the message.
This by the way is the same approach used around NiFi

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.HandleHttpRequest/index.html
> TestListenSMTP ValidEmail fails during parallel build
> -----------------------------------------------------
>
> Key: NIFI-2519
> URL: https://issues.apache.org/jira/browse/NIFI-2519
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Reporter: Joseph Witt
> Assignee: Oleg Zhurakousky
> Fix For: 1.0.0
>
>
> While running a full NiFi parallel build received the following. So there is
> some test issue at least that is impacting build stability.
> [INFO] --- maven-compiler-plugin:3.2:testCompile (default-testCompile) @
> nifi-email-processors ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 4 source files to
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/test-classes
> [WARNING]
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[122,24]
> [deprecation] stop() in Thread has been deprecated
> [WARNING]
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[186,24]
> [deprecation] stop() in Thread has been deprecated
> [WARNING]
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[307,24]
> [deprecation] stop() in Thread has been deprecated
> [INFO]
> [INFO] --- maven-compiler-plugin:3.2:testCompile (groovy-tests) @
> nifi-email-processors ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Nothing to compile - all classes are up to date
> [INFO]
> [INFO] --- maven-surefire-plugin:2.18:test (default-test) @
> nifi-email-processors ---
> [INFO] Surefire report directory:
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/surefire-reports
> [INFO] Using configured provider
> org.apache.maven.surefire.junit4.JUnit4Provider
> -------------------------------------------------------
> T E S T S
> -------------------------------------------------------
> Running org.apache.nifi.processors.email.TestListenSMTP
> Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.473 sec <<<
> FAILURE! - in org.apache.nifi.processors.email.TestListenSMTP
> ValidEmail(org.apache.nifi.processors.email.TestListenSMTP) Time elapsed:
> 0.038 sec <<< FAILURE!
> java.lang.AssertionError: Sending email failed
> at org.junit.Assert.fail(Assert.java:88)
> at org.junit.Assert.assertTrue(Assert.java:41)
> at org.junit.Assert.assertFalse(Assert.java:64)
> at
> org.apache.nifi.processors.email.TestListenSMTP.ValidEmail(TestListenSMTP.java:188)
> Running org.apache.nifi.processors.email.TestExtractEmailAttachments
> Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.1 sec - in
> org.apache.nifi.processors.email.TestExtractEmailAttachments
> Running org.apache.nifi.processors.email.TestExtractEmailHeaders
> Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.029 sec -
> in org.apache.nifi.processors.email.TestExtractEmailHeaders
> Results :
> Failed tests:
> TestListenSMTP.ValidEmail:188 Sending email failed
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)