Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/483#discussion_r70639371
--- Diff:
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.email.smtp.handler;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.StopWatch;
+import org.subethamail.smtp.DropConnectionException;
+import org.subethamail.smtp.MessageContext;
+import org.subethamail.smtp.MessageHandler;
+import org.subethamail.smtp.MessageHandlerFactory;
+import org.subethamail.smtp.RejectException;
+import org.subethamail.smtp.TooMuchDataException;
+import org.subethamail.smtp.server.SMTPServer;
+
+import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
+
+
+public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
+ final LinkedBlockingQueue<SmtpEvent> incomingMessages;
+ final ComponentLog logger;
+
+ public SMTPMessageHandlerFactory(LinkedBlockingQueue<SmtpEvent>
incomingMessages, ComponentLog logger) {
+ this.incomingMessages = incomingMessages;
+ this.logger = logger;
+ }
+
+ @Override
+ public MessageHandler create(MessageContext messageContext) {
+ return new Handler(messageContext, incomingMessages, logger);
+ }
+
+ class Handler implements MessageHandler {
+ final MessageContext messageContext;
+ String from;
+ String recipient;
+ byte [] messageBody;
+
+
+ public Handler(MessageContext messageContext,
LinkedBlockingQueue<SmtpEvent> incomingMessages, ComponentLog logger){
+ this.messageContext = messageContext;
+ }
+
+ @Override
+ public void from(String from) throws RejectException {
+ // TODO: possibly whitelist senders?
+ this.from = from;
+ }
+
+ @Override
+ public void recipient(String recipient) throws RejectException {
+ // TODO: possibly whitelist receivers?
+ this.recipient = recipient;
+ }
+
+ @Override
+ public void data(InputStream inputStream) throws RejectException,
TooMuchDataException, IOException {
+ // Start counting the timer...
+
+ StopWatch watch = new StopWatch(false);
+
+ SMTPServer server = messageContext.getSMTPServer();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ byte [] buffer = new byte[1024];
+ int rd;
+
+ while ((rd = inputStream.read(buffer, 0, buffer.length)) !=
-1) {
+ baos.write(buffer, 0, rd);
+ }
+ if (baos.getBufferLength() > server.getMaxMessageSize()) {
+ throw new TooMuchDataException("Data exceeds the amount
allowed.");
+ }
+
+ baos.flush();
+ this.messageBody = baos.toByteArray();
+
+
+ X509Certificate[] certificates = new X509Certificate[]{};
+
+ String remoteIP = messageContext.getRemoteAddress().toString();
+ String helo = messageContext.getHelo();
+ if (messageContext.getTlsPeerCertificates() != null ){
+ certificates = (X509Certificate[])
messageContext.getTlsPeerCertificates().clone();
+ }
+
+ SmtpEvent message = new SmtpEvent(remoteIP, helo, from,
recipient, certificates, messageBody);
+ try {
+ // Try to queue the message back to the NiFi session
+ incomingMessages.put(message);
+ } catch (InterruptedException e) {
+ // Throws an error to the incoming server alerting about
issues queuing the message in NiFi
+ logger.error("Hit an error sending message back to NiFi
main thread {}. Sending and error back to SMTP client and discarding message",
new Object [] {e});
+ throw new DropConnectionException(451, "NiFi something
went wrong while processing you message, please retry again later");
+ }
+
+ // Once message has been sent to the queue, it should be
processed by NiFi onTrigger,
+ // a flowfile created and its processed status updated before
an acknowledgment is
+ // given back to the SMTP client
+ try {
+ synchronized(message) {
+ while(! message.getProcessed()) {
+ // Check to see if it is too late...
+ final long serverTimeout =
TimeUnit.NANOSECONDS.convert(messageContext.getSMTPServer().getConnectionTimeout(),
TimeUnit.MILLISECONDS);
+ if ( watch.getElapsed(TimeUnit.MILLISECONDS) <=
serverTimeout) {
+ // Will should allow for a couple of retries;
+
message.wait(messageContext.getSMTPServer().getConnectionTimeout() / 20);
+ } else {
+ logger.error("Did not receive the onTrigger
reponse within the acceptable timeframes");
+ throw new DropConnectionException(451, "The
processing of your message timed-out, we may have received it but you better
off sending it again");
--- End diff --
I don't know much about the ordering of "subethamail" aside from what
you've said so I defer to you on that. That said, I think you're thinking a
level higher than what I am. I am talking just about the interaction between
onTrigger() and data() and the handling of the message.
A third race condition exists, after the onTrigger polls the queue it will
pull a message off the queue. This remove it from the queue and now exists as a
variable in the onTrigger and data methods. In this time span the onTrigger has
to write the message body to the contents, which may take a significant amount
of time. While the content is being written the serverTimeout hits. When this
happens it throws the error (also should remove from the queue, I commented on
that) telling the user to retry. The onTrigger has no idea that the connection
was dropped and will continue to process the message (because it already exists
as a variable in the ontrigger).
There may be other unwanted interactions because message exists in both but
this is the one that jumped out at me.
To get around this you may be able to use the message itself as the lock in
a synchronized block or add more variables to SmtpEvent to allow bidirection
communication between the onTrigger and data().
Also I don't think your assumptions about how the blocking queue works is
correct. If you take a look at this link[1], what makes it blocking is purely
that it will let you wait for data to exist in the queue before you take from
it (or wait to put data in if it is full). So if the onTrigger polls() it, it
will grab whatever is the first in the queue. On the flip side, "offer" will
put the data into the queue is there is space or wait until the queue does have
space (with a timeout).
Lastly, I try not to make assumptions about timing (adding 5ms or waiting
20ms in a loop) because you never know what a user is going to do (maybe the
timeout is only 5ms) or what system they are running on. It is much safer to
not assume anything and instead just base it on things the user has configured
(like the sever timeout). That way if anything goes wrong they can't blame it
on the system and it was just their configuration.
[1] http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---