[
https://issues.apache.org/jira/browse/NIFI-2519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416383#comment-15416383
]
ASF GitHub Bot commented on NIFI-2519:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/827#discussion_r74361967
--- Diff:
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
---
@@ -166,317 +108,158 @@
.identifiesControllerService(SSLContextService.class)
.build();
- public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
.name("CLIENT_AUTH")
.displayName("Client Auth")
.description("The client authentication policy to use for the
SSL Context. Only used if an SSL Context Service is provided.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.NONE.toString(),
SSLContextService.ClientAuth.REQUIRED.toString())
.build();
- @Override
- protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
- final List<ValidationResult> results = new ArrayList<>();
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All new messages will be routed as FlowFiles to
this relationship")
+ .build();
- final String clientAuth =
validationContext.getProperty(CLIENT_AUTH).getValue();
- final SSLContextService sslContextService =
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ private final static List<PropertyDescriptor> propertyDescriptors;
- if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
- results.add(new ValidationResult.Builder()
- .explanation("Client Auth must be provided when using
TLS/SSL")
- .valid(false).subject("Client Auth").build());
- }
+ private final static Set<Relationship> relationships;
- return results;
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.add(SMTP_PORT);
+ _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS);
+ _propertyDescriptors.add(SMTP_TIMEOUT);
+ _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE);
+ _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
+ _propertyDescriptors.add(CLIENT_AUTH);
+ propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(_relationships);
}
+ private volatile SMTPServer smtp;
- public static final Relationship REL_SUCCESS = new
Relationship.Builder()
- .name("success")
- .description("Extraction was successful")
- .build();
+ private volatile SmtpConsumer smtpConsumer;
- private Set<Relationship> relationships;
- private List<PropertyDescriptor> propertyDescriptors;
- private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages;
+ /**
+ *
+ */
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory
sessionFactory) throws ProcessException {
+ ProcessSession processSession = sessionFactory.createSession();
+ if (this.smtp == null) {
+ this.setupSmtpIfNecessary(context, processSession);
+ }
+
+ if (this.smtpConsumer.hasMessage()) {
+ try {
+ /*
+ * Will consume incoming message directly from the wire
and into
+ * FlowFile/Content repository before exiting. This
essentially
+ * limits any potential data loss by allowing SMTPServer
thread
+ * to actually commit NiFi session if all good. However in
the
+ * event of exception, such exception will be propagated
back to
+ * the email sender via "undeliverable message" allowing
such
+ * user to re-send the message
+ */
+ this.smtpConsumer.consumeUsing((inputDataStream) -> {
+ FlowFile flowFile = processSession.create();
+ AtomicInteger size = new AtomicInteger();
+ flowFile = processSession.write(flowFile, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ size.set(IOUtils.copy(inputDataStream, out));
+ }
+ });
+
processSession.getProvenanceReporter().receive(flowFile, "smtp://"
+ + ListenSMTP.this.smtp.getHostName() + ":" +
ListenSMTP.this.smtp.getPort() + "/");
+ processSession.transfer(flowFile, REL_SUCCESS);
+ processSession.commit();
+ return size.get();
+ });
+ } catch (Exception e) {
+ this.getLogger().error("Failed while listenning for
messages.", e);
+ processSession.rollback();
+ }
+ } else {
+ context.yield();
+ }
+ }
- private volatile SMTPServer server;
- private AtomicBoolean initialized = new AtomicBoolean(false);
- private AtomicBoolean stopping = new AtomicBoolean(false);
+ /**
+ *
+ */
+ @OnStopped
+ public void close() {
+ this.getLogger().info("Stopping SMTPServer");
+ this.smtp.stop();
+ this.smtp = null;
+ this.getLogger().info("SMTPServer stopped");
+ }
+ /**
+ *
+ */
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
+ /**
+ *
+ */
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- this.relationships = Collections.unmodifiableSet(relationships);
-
- final List<PropertyDescriptor> props = new ArrayList<>();
- props.add(SMTP_PORT);
- props.add(SMTP_HOSTNAME);
- props.add(SMTP_MAXIMUM_CONNECTIONS);
- props.add(SMTP_TIMEOUT);
- props.add(SMTP_MAXIMUM_MSG_SIZE);
- props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE);
- props.add(SSL_CONTEXT_SERVICE);
- props.add(CLIENT_AUTH);
- this.propertyDescriptors = Collections.unmodifiableList(props);
-
- }
-
- // Upon Schedule, reset the initialized state to false
- @OnScheduled
- public void onScheduled(ProcessContext context) {
- initialized.set(false);
- stopping.set(false);
- }
-
- protected synchronized void initializeSMTPServer(final ProcessContext
context) throws Exception {
-
- // check if we are already running or if it is stopping
- if (initialized.get() && server.isRunning() || stopping.get() ) {
- return;
- }
-
- incomingMessages = new
LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger());
-
- String clientAuth = null;
-
- // If an SSLContextService was provided then create an SSLContext
to pass down to the server
- SSLContext sslContext = null;
- final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- if (sslContextService != null) {
- clientAuth = context.getProperty(CLIENT_AUTH).getValue();
- sslContext =
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
+ /**
+ *
+ */
+ private synchronized void setupSmtpIfNecessary(ProcessContext context,
ProcessSession processSession) {
+ if (this.smtp == null) {
+ SmtpConsumer consumer = new SmtpConsumer();
+ SMTPServer smtpServer = this.createServerInstance(context,
consumer);
+ smtpServer.setSoftwareName("Apache NiFi");
+ smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger());
+
smtpServer.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
+
smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+
+ this.smtpConsumer = consumer;
+ this.smtp = smtpServer;
+ this.smtp.start();
}
+ }
- final SSLContext finalSslContext = sslContext;
-
- SMTPMessageHandlerFactory smtpMessageHandlerFactory = new
SMTPMessageHandlerFactory(incomingMessages, getLogger());
- final SMTPServer server = new
SMTPServer(smtpMessageHandlerFactory) {
-
+ /**
+ *
+ */
+ private SMTPServer createServerInstance(ProcessContext context,
SmtpConsumer consumer) {
+ SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ SMTPServer smtpServer = sslContextService == null ? new
SMTPServer(consumer) : new SMTPServer(consumer) {
@Override
public SSLSocket createSSLSocket(Socket socket) throws
IOException {
InetSocketAddress remoteAddress = (InetSocketAddress)
socket.getRemoteSocketAddress();
-
- SSLSocketFactory socketFactory =
finalSslContext.getSocketFactory();
-
- SSLSocket s = (SSLSocket)
(socketFactory.createSocket(socket, remoteAddress.getHostName(),
socket.getPort(), true));
-
- s.setUseClientMode(false);
-
-
- // For some reason the createSSLContext above is not
enough to enforce
- // client side auth
- // If client auth is required...
- if
(SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue()))
{
- s.setNeedClientAuth(true);
+ String clientAuth =
context.getProperty(CLIENT_AUTH).getValue();
+ SSLContext sslContext =
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
+ SSLSocketFactory socketFactory =
sslContext.getSocketFactory();
+ SSLSocket sslSocket = (SSLSocket)
(socketFactory.createSocket(socket,
remoteAddress.getHostName(),socket.getPort(), true));
+ sslSocket.setUseClientMode(false);
+
+ if
(SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) {
+ this.setRequireTLS(true);
+ sslSocket.setNeedClientAuth(true);
}
-
- return s;
+ return sslSocket;
}
};
-
- // Set some parameters to our server
- server.setSoftwareName("Apache NiFi");
-
-
- // Set the Server options based on properties
- server.setPort(context.getProperty(SMTP_PORT).asInteger());
- server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
-
server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
-
server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
-
server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
-
-
- // Check if TLS should be enabled
if (sslContextService != null) {
- server.setEnableTLS(true);
- } else {
- server.setHideTLS(true);
--- End diff --
Fair enough. Thanks for heads up.
> TestListenSMTP ValidEmail fails during parallel build
> -----------------------------------------------------
>
> Key: NIFI-2519
> URL: https://issues.apache.org/jira/browse/NIFI-2519
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Reporter: Joseph Witt
> Assignee: Oleg Zhurakousky
> Fix For: 1.0.0
>
>
> While running a full NiFi parallel build received the following. So there is
> some test issue at least that is impacting build stability.
> [INFO] --- maven-compiler-plugin:3.2:testCompile (default-testCompile) @
> nifi-email-processors ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 4 source files to
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/test-classes
> [WARNING]
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[122,24]
> [deprecation] stop() in Thread has been deprecated
> [WARNING]
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[186,24]
> [deprecation] stop() in Thread has been deprecated
> [WARNING]
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[307,24]
> [deprecation] stop() in Thread has been deprecated
> [INFO]
> [INFO] --- maven-compiler-plugin:3.2:testCompile (groovy-tests) @
> nifi-email-processors ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Nothing to compile - all classes are up to date
> [INFO]
> [INFO] --- maven-surefire-plugin:2.18:test (default-test) @
> nifi-email-processors ---
> [INFO] Surefire report directory:
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/surefire-reports
> [INFO] Using configured provider
> org.apache.maven.surefire.junit4.JUnit4Provider
> -------------------------------------------------------
> T E S T S
> -------------------------------------------------------
> Running org.apache.nifi.processors.email.TestListenSMTP
> Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.473 sec <<<
> FAILURE! - in org.apache.nifi.processors.email.TestListenSMTP
> ValidEmail(org.apache.nifi.processors.email.TestListenSMTP) Time elapsed:
> 0.038 sec <<< FAILURE!
> java.lang.AssertionError: Sending email failed
> at org.junit.Assert.fail(Assert.java:88)
> at org.junit.Assert.assertTrue(Assert.java:41)
> at org.junit.Assert.assertFalse(Assert.java:64)
> at
> org.apache.nifi.processors.email.TestListenSMTP.ValidEmail(TestListenSMTP.java:188)
> Running org.apache.nifi.processors.email.TestExtractEmailAttachments
> Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.1 sec - in
> org.apache.nifi.processors.email.TestExtractEmailAttachments
> Running org.apache.nifi.processors.email.TestExtractEmailHeaders
> Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.029 sec -
> in org.apache.nifi.processors.email.TestExtractEmailHeaders
> Results :
> Failed tests:
> TestListenSMTP.ValidEmail:188 Sending email failed
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)