Github user trixpan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/827#discussion_r74414544
--- Diff:
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
---
@@ -166,317 +108,158 @@
.identifiesControllerService(SSLContextService.class)
.build();
- public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
.name("CLIENT_AUTH")
.displayName("Client Auth")
.description("The client authentication policy to use for the
SSL Context. Only used if an SSL Context Service is provided.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.NONE.toString(),
SSLContextService.ClientAuth.REQUIRED.toString())
.build();
- @Override
- protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
- final List<ValidationResult> results = new ArrayList<>();
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All new messages will be routed as FlowFiles to
this relationship")
+ .build();
- final String clientAuth =
validationContext.getProperty(CLIENT_AUTH).getValue();
- final SSLContextService sslContextService =
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ private final static List<PropertyDescriptor> propertyDescriptors;
- if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
- results.add(new ValidationResult.Builder()
- .explanation("Client Auth must be provided when using
TLS/SSL")
- .valid(false).subject("Client Auth").build());
- }
+ private final static Set<Relationship> relationships;
- return results;
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.add(SMTP_PORT);
+ _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS);
+ _propertyDescriptors.add(SMTP_TIMEOUT);
+ _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE);
+ _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
+ _propertyDescriptors.add(CLIENT_AUTH);
+ propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(_relationships);
}
+ private volatile SMTPServer smtp;
- public static final Relationship REL_SUCCESS = new
Relationship.Builder()
- .name("success")
- .description("Extraction was successful")
- .build();
+ private volatile SmtpConsumer smtpConsumer;
- private Set<Relationship> relationships;
- private List<PropertyDescriptor> propertyDescriptors;
- private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages;
+ /**
+ *
+ */
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory
sessionFactory) throws ProcessException {
+ ProcessSession processSession = sessionFactory.createSession();
+ if (this.smtp == null) {
+ this.setupSmtpIfNecessary(context, processSession);
+ }
+
+ if (this.smtpConsumer.hasMessage()) {
+ try {
+ /*
+ * Will consume incoming message directly from the wire
and into
+ * FlowFile/Content repository before exiting. This
essentially
+ * limits any potential data loss by allowing SMTPServer
thread
+ * to actually commit NiFi session if all good. However in
the
+ * event of exception, such exception will be propagated
back to
+ * the email sender via "undeliverable message" allowing
such
+ * user to re-send the message
+ */
+ this.smtpConsumer.consumeUsing((inputDataStream) -> {
+ FlowFile flowFile = processSession.create();
+ AtomicInteger size = new AtomicInteger();
+ flowFile = processSession.write(flowFile, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ size.set(IOUtils.copy(inputDataStream, out));
+ }
+ });
+
processSession.getProvenanceReporter().receive(flowFile, "smtp://"
+ + ListenSMTP.this.smtp.getHostName() + ":" +
ListenSMTP.this.smtp.getPort() + "/");
+ processSession.transfer(flowFile, REL_SUCCESS);
+ processSession.commit();
+ return size.get();
+ });
+ } catch (Exception e) {
+ this.getLogger().error("Failed while listenning for
messages.", e);
--- End diff --
typo
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---