Github user trixpan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/827#discussion_r74363458
  
    --- Diff: 
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
 ---
    @@ -166,317 +108,158 @@
                 .identifiesControllerService(SSLContextService.class)
                 .build();
     
    -    public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
    +    static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
                 .name("CLIENT_AUTH")
                 .displayName("Client Auth")
                 .description("The client authentication policy to use for the 
SSL Context. Only used if an SSL Context Service is provided.")
                 .required(false)
                 .allowableValues(SSLContextService.ClientAuth.NONE.toString(), 
SSLContextService.ClientAuth.REQUIRED.toString())
                 .build();
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
    -        final List<ValidationResult> results = new ArrayList<>();
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All new messages will be routed as FlowFiles to 
this relationship")
    +            .build();
     
    -        final String clientAuth = 
validationContext.getProperty(CLIENT_AUTH).getValue();
    -        final SSLContextService sslContextService = 
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +    private final static List<PropertyDescriptor> propertyDescriptors;
     
    -        if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
    -            results.add(new ValidationResult.Builder()
    -                    .explanation("Client Auth must be provided when using 
TLS/SSL")
    -                    .valid(false).subject("Client Auth").build());
    -        }
    +    private final static Set<Relationship> relationships;
     
    -        return results;
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(SMTP_PORT);
    +        _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS);
    +        _propertyDescriptors.add(SMTP_TIMEOUT);
    +        _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
     
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        relationships = Collections.unmodifiableSet(_relationships);
         }
     
    +    private volatile SMTPServer smtp;
     
    -    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    -            .name("success")
    -            .description("Extraction was successful")
    -            .build();
    +    private volatile SmtpConsumer smtpConsumer;
     
    -    private Set<Relationship> relationships;
    -    private List<PropertyDescriptor> propertyDescriptors;
    -    private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages;
    +    /**
    +     *
    +     */
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
    +        ProcessSession processSession = sessionFactory.createSession();
    +        if (this.smtp == null) {
    +            this.setupSmtpIfNecessary(context, processSession);
    +        }
    +
    +        if (this.smtpConsumer.hasMessage()) {
    +            try {
    +                /*
    +                 * Will consume incoming message directly from the wire 
and into
    +                 * FlowFile/Content repository before exiting. This 
essentially
    +                 * limits any potential data loss by allowing SMTPServer 
thread
    +                 * to actually commit NiFi session if all good. However in 
the
    +                 * event of exception, such exception will be propagated 
back to
    +                 * the email sender via "undeliverable message" allowing 
such
    +                 * user to re-send the message
    +                 */
    +                this.smtpConsumer.consumeUsing((inputDataStream) -> {
    +                    FlowFile flowFile = processSession.create();
    +                    AtomicInteger size = new AtomicInteger();
    +                    flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
    +                        @Override
    +                        public void process(final OutputStream out) throws 
IOException {
    +                            size.set(IOUtils.copy(inputDataStream, out));
    +                        }
    +                    });
    +                    
processSession.getProvenanceReporter().receive(flowFile, "smtp://"
    +                            + ListenSMTP.this.smtp.getHostName() + ":" + 
ListenSMTP.this.smtp.getPort() + "/");
    +                    processSession.transfer(flowFile, REL_SUCCESS);
    +                    processSession.commit();
    +                    return size.get();
    +                });
    +            } catch (Exception e) {
    +                this.getLogger().error("Failed while listenning for 
messages.", e);
    +                processSession.rollback();
    +            }
    +        } else {
    +            context.yield();
    +        }
    +    }
     
    -    private volatile SMTPServer server;
    -    private AtomicBoolean initialized = new AtomicBoolean(false);
    -    private AtomicBoolean stopping = new AtomicBoolean(false);
    +    /**
    +     *
    +     */
    +    @OnStopped
    +    public void close() {
    +        this.getLogger().info("Stopping SMTPServer");
    +        this.smtp.stop();
    +        this.smtp = null;
    +        this.getLogger().info("SMTPServer stopped");
    +    }
     
    +    /**
    +     *
    +     */
         @Override
         public Set<Relationship> getRelationships() {
             return relationships;
         }
     
    +    /**
    +    *
    +    */
         @Override
         protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
             return propertyDescriptors;
         }
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final Set<Relationship> relationships = new HashSet<>();
    -        relationships.add(REL_SUCCESS);
    -        this.relationships = Collections.unmodifiableSet(relationships);
    -
    -        final List<PropertyDescriptor> props = new ArrayList<>();
    -        props.add(SMTP_PORT);
    -        props.add(SMTP_HOSTNAME);
    -        props.add(SMTP_MAXIMUM_CONNECTIONS);
    -        props.add(SMTP_TIMEOUT);
    -        props.add(SMTP_MAXIMUM_MSG_SIZE);
    -        props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE);
    -        props.add(SSL_CONTEXT_SERVICE);
    -        props.add(CLIENT_AUTH);
    -        this.propertyDescriptors = Collections.unmodifiableList(props);
    -
    -    }
    -
    -    // Upon Schedule, reset the initialized state to false
    -    @OnScheduled
    -    public void onScheduled(ProcessContext context) {
    -        initialized.set(false);
    -        stopping.set(false);
    -    }
    -
    -    protected synchronized void initializeSMTPServer(final ProcessContext 
context) throws Exception {
    -
    -        // check if we are already running or if it is stopping
    -        if (initialized.get() && server.isRunning() || stopping.get() ) {
    -            return;
    -        }
    -
    -        incomingMessages = new 
LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger());
    -
    -        String clientAuth = null;
    -
    -        // If an SSLContextService was provided then create an SSLContext 
to pass down to the server
    -        SSLContext sslContext = null;
    -        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    -        if (sslContextService != null) {
    -            clientAuth = context.getProperty(CLIENT_AUTH).getValue();
    -            sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
    +    /**
    +     *
    +     */
    +    private synchronized void setupSmtpIfNecessary(ProcessContext context, 
ProcessSession processSession) {
    +        if (this.smtp == null) {
    +            SmtpConsumer consumer = new SmtpConsumer();
    +            SMTPServer smtpServer = this.createServerInstance(context, 
consumer);
    +            smtpServer.setSoftwareName("Apache NiFi");
    +            smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger());
    +            
smtpServer.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
    +            
smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
    +
    +            this.smtpConsumer = consumer;
    +            this.smtp = smtpServer;
    +            this.smtp.start();
             }
    +    }
     
    -        final SSLContext finalSslContext = sslContext;
    -
    -        SMTPMessageHandlerFactory smtpMessageHandlerFactory = new 
SMTPMessageHandlerFactory(incomingMessages, getLogger());
    -        final SMTPServer server = new 
SMTPServer(smtpMessageHandlerFactory) {
    -
    +    /**
    +     *
    +     */
    +    private SMTPServer createServerInstance(ProcessContext context, 
SmtpConsumer consumer) {
    +        SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +        SMTPServer smtpServer = sslContextService == null ? new 
SMTPServer(consumer) : new SMTPServer(consumer) {
                 @Override
                 public SSLSocket createSSLSocket(Socket socket) throws 
IOException {
                     InetSocketAddress remoteAddress = (InetSocketAddress) 
socket.getRemoteSocketAddress();
    -
    -                SSLSocketFactory socketFactory = 
finalSslContext.getSocketFactory();
    -
    -                SSLSocket s = (SSLSocket) 
(socketFactory.createSocket(socket, remoteAddress.getHostName(), 
socket.getPort(), true));
    -
    -                s.setUseClientMode(false);
    -
    -
    -                // For some reason the createSSLContext above is not 
enough to enforce
    -                // client side auth
    -                // If client auth is required...
    -                if 
(SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue()))
 {
    -                    s.setNeedClientAuth(true);
    +                String clientAuth = 
context.getProperty(CLIENT_AUTH).getValue();
    +                SSLContext sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
    +                SSLSocketFactory socketFactory = 
sslContext.getSocketFactory();
    +                SSLSocket sslSocket = (SSLSocket) 
(socketFactory.createSocket(socket, 
remoteAddress.getHostName(),socket.getPort(), true));
    +                sslSocket.setUseClientMode(false);
    +
    +                if 
(SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) {
    +                    this.setRequireTLS(true);
    +                    sslSocket.setNeedClientAuth(true);
                     }
     
    -
    -                return s;
    +                return sslSocket;
                 }
             };
    -
    -        // Set some parameters to our server
    -        server.setSoftwareName("Apache NiFi");
    -
    -
    -        // Set the Server options based on properties
    -        server.setPort(context.getProperty(SMTP_PORT).asInteger());
    -        server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
    -        
server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
    -        
server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
    -        
server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
    -
    -
    -        // Check if TLS should be enabled
             if (sslContextService != null) {
    -            server.setEnableTLS(true);
    -        } else {
    -            server.setHideTLS(true);
    -        }
    -
    -        // Set TLS to required in case CLIENT_AUTH = required
    -        if 
(SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue()))
 {
    -            server.setRequireTLS(true);
    -        }
    -
    -        this.server = server;
    -        server.start();
    -
    -        getLogger().info("Server started and listening on port " + 
server.getPort());
    -
    -        initialized.set(true);
    -        stopping.set(false);
    -    }
    -
    -    @OnUnscheduled
    -    public void startShutdown() throws Exception {
    -        if (server != null) {
    -            stopping.set(true);
    -            getLogger().info("Shutting down processor P{}", new 
Object[]{server});
    -            server.stop();
    -            getLogger().info("Shut down {}", new Object[]{server});
    -        }
    -    }
    -
    -    @OnStopped
    -    public void completeShutdown() throws Exception {
    -        if (server != null) {
    -            if (!server.isRunning() && stopping.get() ) {
    -                stopping.set(false);
    -            }
    -            getLogger().info("Completed shut down {}", new 
Object[]{server});
    -        }
    -    }
    -
    -    @Override
    -    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    -
    -        try {
    -            initializeSMTPServer(context);
    -        } catch (Exception e) {
    -            context.yield();
    -            throw new ProcessException("Failed to initialize the SMTP 
server", e);
    -        }
    -
    -        while (!incomingMessages.isEmpty()) {
    -            SmtpEvent message = incomingMessages.poll();
    -
    -            if (message == null) {
    -                return;
    -            }
    -
    -            synchronized (message) {
    -                if (resultCodeSetAndIsError(message)) {
    -                    SMTPResultCode resultCode = 
SMTPResultCode.fromCode(message.getReturnCode());
    -                    getLogger().warn("Message failed before onTrigger 
processing message was: " + resultCode.getLogMessage());
    -                    continue;
    -                }
    -
    -                try {
    -                    FlowFile flowfile = session.create();
    -
    -                    if (message.getMessageData() != null) {
    -                        flowfile = session.write(flowfile, out -> {
    -                            InputStream inputStream = 
message.getMessageData();
    -                            byte [] buffer = new byte[1024];
    -
    -                            int rd;
    -                            long totalBytesRead =0;
    -
    -                            while ((rd = inputStream.read(buffer, 0, 
buffer.length)) != -1 ) {
    -                                totalBytesRead += rd;
    -                                if (totalBytesRead > 
server.getMaxMessageSize() ) {
    -                                    message.setReturnCode(500);
    -                                    message.setProcessed();
    -                                    break;
    -                                }
    -                                out.write(buffer, 0, rd);
    -                            }
    -                            out.flush();
    -                        });
    -                    } else {
    -                        getLogger().debug("Message body was null");
    -                        
message.setReturnCode(SMTPResultCode.UNKNOWN_ERROR_CODE.getCode());
    -                        message.setProcessed();
    -                    }
    -
    -                    if (!message.getProcessed()) {
    -                        HashMap<String, String> attributes = new 
HashMap<>();
    -                        // Gather message attributes
    -                        attributes.put(SMTP_HELO, message.getHelo());
    -                        attributes.put(SMTP_SRC_IP, message.getHelo());
    -                        attributes.put(SMTP_FROM, message.getFrom());
    -                        attributes.put(SMTP_TO, message.getTo());
    -
    -                        List<Map<String, String>> details = 
message.getCertifcateDetails();
    -                        int c = 0;
    -
    -                        // Add a selection of each X509 certificates to 
the already gathered attributes
    -
    -                        for (Map<String, String> detail : details) {
    -                            attributes.put("smtp.certificate." + c + 
".serial", detail.getOrDefault("SerialNumber", null));
    -                            attributes.put("smtp.certificate." + c + 
".subjectName", detail.getOrDefault("SubjectName", null));
    --- End diff --
    
    It boils down to chain of custody. 
    
    Imagine Joe Blogs and Luo Cipher both work for ACME corp and are issued 
certificated to send messages to my NiFi instance using TLS and requiring 
client Auth.
    
    Since both are authorised, you want to distinguish from your authorised 
users who sent a particular email to NiFi, hence you store the subjectName used 
to send the message.
    
    This by the way is the same approach used around NiFi
    
    
![image](https://cloud.githubusercontent.com/assets/3108527/17577508/ec07a332-5fc1-11e6-8d1a-50bb29edc9e2.png)
    
    
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.HandleHttpRequest/index.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to