Github user trixpan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/827#discussion_r74414544
  
    --- Diff: 
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
 ---
    @@ -166,317 +108,158 @@
                 .identifiesControllerService(SSLContextService.class)
                 .build();
     
    -    public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
    +    static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
                 .name("CLIENT_AUTH")
                 .displayName("Client Auth")
                 .description("The client authentication policy to use for the 
SSL Context. Only used if an SSL Context Service is provided.")
                 .required(false)
                 .allowableValues(SSLContextService.ClientAuth.NONE.toString(), 
SSLContextService.ClientAuth.REQUIRED.toString())
                 .build();
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
    -        final List<ValidationResult> results = new ArrayList<>();
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All new messages will be routed as FlowFiles to 
this relationship")
    +            .build();
     
    -        final String clientAuth = 
validationContext.getProperty(CLIENT_AUTH).getValue();
    -        final SSLContextService sslContextService = 
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +    private final static List<PropertyDescriptor> propertyDescriptors;
     
    -        if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
    -            results.add(new ValidationResult.Builder()
    -                    .explanation("Client Auth must be provided when using 
TLS/SSL")
    -                    .valid(false).subject("Client Auth").build());
    -        }
    +    private final static Set<Relationship> relationships;
     
    -        return results;
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(SMTP_PORT);
    +        _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS);
    +        _propertyDescriptors.add(SMTP_TIMEOUT);
    +        _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
     
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        relationships = Collections.unmodifiableSet(_relationships);
         }
     
    +    private volatile SMTPServer smtp;
     
    -    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    -            .name("success")
    -            .description("Extraction was successful")
    -            .build();
    +    private volatile SmtpConsumer smtpConsumer;
     
    -    private Set<Relationship> relationships;
    -    private List<PropertyDescriptor> propertyDescriptors;
    -    private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages;
    +    /**
    +     *
    +     */
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
    +        ProcessSession processSession = sessionFactory.createSession();
    +        if (this.smtp == null) {
    +            this.setupSmtpIfNecessary(context, processSession);
    +        }
    +
    +        if (this.smtpConsumer.hasMessage()) {
    +            try {
    +                /*
    +                 * Will consume incoming message directly from the wire 
and into
    +                 * FlowFile/Content repository before exiting. This 
essentially
    +                 * limits any potential data loss by allowing SMTPServer 
thread
    +                 * to actually commit NiFi session if all good. However in 
the
    +                 * event of exception, such exception will be propagated 
back to
    +                 * the email sender via "undeliverable message" allowing 
such
    +                 * user to re-send the message
    +                 */
    +                this.smtpConsumer.consumeUsing((inputDataStream) -> {
    +                    FlowFile flowFile = processSession.create();
    +                    AtomicInteger size = new AtomicInteger();
    +                    flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
    +                        @Override
    +                        public void process(final OutputStream out) throws 
IOException {
    +                            size.set(IOUtils.copy(inputDataStream, out));
    +                        }
    +                    });
    +                    
processSession.getProvenanceReporter().receive(flowFile, "smtp://"
    +                            + ListenSMTP.this.smtp.getHostName() + ":" + 
ListenSMTP.this.smtp.getPort() + "/");
    +                    processSession.transfer(flowFile, REL_SUCCESS);
    +                    processSession.commit();
    +                    return size.get();
    +                });
    +            } catch (Exception e) {
    +                this.getLogger().error("Failed while listenning for 
messages.", e);
    --- End diff --
    
    typo


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to