Github user trixpan commented on a diff in the pull request: https://github.com/apache/nifi/pull/827#discussion_r74363458 --- Diff: nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java --- @@ -166,317 +108,158 @@ .identifiesControllerService(SSLContextService.class) .build(); - public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() .name("CLIENT_AUTH") .displayName("Client Auth") .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") .required(false) .allowableValues(SSLContextService.ClientAuth.NONE.toString(), SSLContextService.ClientAuth.REQUIRED.toString()) .build(); - @Override - protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { - final List<ValidationResult> results = new ArrayList<>(); + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All new messages will be routed as FlowFiles to this relationship") + .build(); - final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); - final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + private final static List<PropertyDescriptor> propertyDescriptors; - if (sslContextService != null && StringUtils.isBlank(clientAuth)) { - results.add(new ValidationResult.Builder() - .explanation("Client Auth must be provided when using TLS/SSL") - .valid(false).subject("Client Auth").build()); - } + private final static Set<Relationship> relationships; - return results; + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(SMTP_PORT); + _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS); + _propertyDescriptors.add(SMTP_TIMEOUT); + _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE); + _propertyDescriptors.add(SSL_CONTEXT_SERVICE); + _propertyDescriptors.add(CLIENT_AUTH); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); } + private volatile SMTPServer smtp; - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Extraction was successful") - .build(); + private volatile SmtpConsumer smtpConsumer; - private Set<Relationship> relationships; - private List<PropertyDescriptor> propertyDescriptors; - private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages; + /** + * + */ + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + ProcessSession processSession = sessionFactory.createSession(); + if (this.smtp == null) { + this.setupSmtpIfNecessary(context, processSession); + } + + if (this.smtpConsumer.hasMessage()) { + try { + /* + * Will consume incoming message directly from the wire and into + * FlowFile/Content repository before exiting. This essentially + * limits any potential data loss by allowing SMTPServer thread + * to actually commit NiFi session if all good. However in the + * event of exception, such exception will be propagated back to + * the email sender via "undeliverable message" allowing such + * user to re-send the message + */ + this.smtpConsumer.consumeUsing((inputDataStream) -> { + FlowFile flowFile = processSession.create(); + AtomicInteger size = new AtomicInteger(); + flowFile = processSession.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + size.set(IOUtils.copy(inputDataStream, out)); + } + }); + processSession.getProvenanceReporter().receive(flowFile, "smtp://" + + ListenSMTP.this.smtp.getHostName() + ":" + ListenSMTP.this.smtp.getPort() + "/"); + processSession.transfer(flowFile, REL_SUCCESS); + processSession.commit(); + return size.get(); + }); + } catch (Exception e) { + this.getLogger().error("Failed while listenning for messages.", e); + processSession.rollback(); + } + } else { + context.yield(); + } + } - private volatile SMTPServer server; - private AtomicBoolean initialized = new AtomicBoolean(false); - private AtomicBoolean stopping = new AtomicBoolean(false); + /** + * + */ + @OnStopped + public void close() { + this.getLogger().info("Stopping SMTPServer"); + this.smtp.stop(); + this.smtp = null; + this.getLogger().info("SMTPServer stopped"); + } + /** + * + */ @Override public Set<Relationship> getRelationships() { return relationships; } + /** + * + */ @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return propertyDescriptors; } - @Override - protected void init(final ProcessorInitializationContext context) { - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(relationships); - - final List<PropertyDescriptor> props = new ArrayList<>(); - props.add(SMTP_PORT); - props.add(SMTP_HOSTNAME); - props.add(SMTP_MAXIMUM_CONNECTIONS); - props.add(SMTP_TIMEOUT); - props.add(SMTP_MAXIMUM_MSG_SIZE); - props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE); - props.add(SSL_CONTEXT_SERVICE); - props.add(CLIENT_AUTH); - this.propertyDescriptors = Collections.unmodifiableList(props); - - } - - // Upon Schedule, reset the initialized state to false - @OnScheduled - public void onScheduled(ProcessContext context) { - initialized.set(false); - stopping.set(false); - } - - protected synchronized void initializeSMTPServer(final ProcessContext context) throws Exception { - - // check if we are already running or if it is stopping - if (initialized.get() && server.isRunning() || stopping.get() ) { - return; - } - - incomingMessages = new LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger()); - - String clientAuth = null; - - // If an SSLContextService was provided then create an SSLContext to pass down to the server - SSLContext sslContext = null; - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - if (sslContextService != null) { - clientAuth = context.getProperty(CLIENT_AUTH).getValue(); - sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); + /** + * + */ + private synchronized void setupSmtpIfNecessary(ProcessContext context, ProcessSession processSession) { + if (this.smtp == null) { + SmtpConsumer consumer = new SmtpConsumer(); + SMTPServer smtpServer = this.createServerInstance(context, consumer); + smtpServer.setSoftwareName("Apache NiFi"); + smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger()); + smtpServer.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue()); + smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + + this.smtpConsumer = consumer; + this.smtp = smtpServer; + this.smtp.start(); } + } - final SSLContext finalSslContext = sslContext; - - SMTPMessageHandlerFactory smtpMessageHandlerFactory = new SMTPMessageHandlerFactory(incomingMessages, getLogger()); - final SMTPServer server = new SMTPServer(smtpMessageHandlerFactory) { - + /** + * + */ + private SMTPServer createServerInstance(ProcessContext context, SmtpConsumer consumer) { + SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + SMTPServer smtpServer = sslContextService == null ? new SMTPServer(consumer) : new SMTPServer(consumer) { @Override public SSLSocket createSSLSocket(Socket socket) throws IOException { InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); - - SSLSocketFactory socketFactory = finalSslContext.getSocketFactory(); - - SSLSocket s = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true)); - - s.setUseClientMode(false); - - - // For some reason the createSSLContext above is not enough to enforce - // client side auth - // If client auth is required... - if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) { - s.setNeedClientAuth(true); + String clientAuth = context.getProperty(CLIENT_AUTH).getValue(); + SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); + SSLSocketFactory socketFactory = sslContext.getSocketFactory(); + SSLSocket sslSocket = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(),socket.getPort(), true)); + sslSocket.setUseClientMode(false); + + if (SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) { + this.setRequireTLS(true); + sslSocket.setNeedClientAuth(true); } - - return s; + return sslSocket; } }; - - // Set some parameters to our server - server.setSoftwareName("Apache NiFi"); - - - // Set the Server options based on properties - server.setPort(context.getProperty(SMTP_PORT).asInteger()); - server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue()); - server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue()); - server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); - server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - - - // Check if TLS should be enabled if (sslContextService != null) { - server.setEnableTLS(true); - } else { - server.setHideTLS(true); - } - - // Set TLS to required in case CLIENT_AUTH = required - if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) { - server.setRequireTLS(true); - } - - this.server = server; - server.start(); - - getLogger().info("Server started and listening on port " + server.getPort()); - - initialized.set(true); - stopping.set(false); - } - - @OnUnscheduled - public void startShutdown() throws Exception { - if (server != null) { - stopping.set(true); - getLogger().info("Shutting down processor P{}", new Object[]{server}); - server.stop(); - getLogger().info("Shut down {}", new Object[]{server}); - } - } - - @OnStopped - public void completeShutdown() throws Exception { - if (server != null) { - if (!server.isRunning() && stopping.get() ) { - stopping.set(false); - } - getLogger().info("Completed shut down {}", new Object[]{server}); - } - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - - try { - initializeSMTPServer(context); - } catch (Exception e) { - context.yield(); - throw new ProcessException("Failed to initialize the SMTP server", e); - } - - while (!incomingMessages.isEmpty()) { - SmtpEvent message = incomingMessages.poll(); - - if (message == null) { - return; - } - - synchronized (message) { - if (resultCodeSetAndIsError(message)) { - SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); - getLogger().warn("Message failed before onTrigger processing message was: " + resultCode.getLogMessage()); - continue; - } - - try { - FlowFile flowfile = session.create(); - - if (message.getMessageData() != null) { - flowfile = session.write(flowfile, out -> { - InputStream inputStream = message.getMessageData(); - byte [] buffer = new byte[1024]; - - int rd; - long totalBytesRead =0; - - while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) { - totalBytesRead += rd; - if (totalBytesRead > server.getMaxMessageSize() ) { - message.setReturnCode(500); - message.setProcessed(); - break; - } - out.write(buffer, 0, rd); - } - out.flush(); - }); - } else { - getLogger().debug("Message body was null"); - message.setReturnCode(SMTPResultCode.UNKNOWN_ERROR_CODE.getCode()); - message.setProcessed(); - } - - if (!message.getProcessed()) { - HashMap<String, String> attributes = new HashMap<>(); - // Gather message attributes - attributes.put(SMTP_HELO, message.getHelo()); - attributes.put(SMTP_SRC_IP, message.getHelo()); - attributes.put(SMTP_FROM, message.getFrom()); - attributes.put(SMTP_TO, message.getTo()); - - List<Map<String, String>> details = message.getCertifcateDetails(); - int c = 0; - - // Add a selection of each X509 certificates to the already gathered attributes - - for (Map<String, String> detail : details) { - attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null)); - attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null)); --- End diff -- It boils down to chain of custody. Imagine Joe Blogs and Luo Cipher both work for ACME corp and are issued certificated to send messages to my NiFi instance using TLS and requiring client Auth. Since both are authorised, you want to distinguish from your authorised users who sent a particular email to NiFi, hence you store the subjectName used to send the message. This by the way is the same approach used around NiFi ![image](https://cloud.githubusercontent.com/assets/3108527/17577508/ec07a332-5fc1-11e6-8d1a-50bb29edc9e2.png) https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.HandleHttpRequest/index.html
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---