Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/827#discussion_r74362408
--- Diff:
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
---
@@ -166,317 +108,158 @@
.identifiesControllerService(SSLContextService.class)
.build();
- public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
.name("CLIENT_AUTH")
.displayName("Client Auth")
.description("The client authentication policy to use for the
SSL Context. Only used if an SSL Context Service is provided.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.NONE.toString(),
SSLContextService.ClientAuth.REQUIRED.toString())
.build();
- @Override
- protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
- final List<ValidationResult> results = new ArrayList<>();
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All new messages will be routed as FlowFiles to
this relationship")
+ .build();
- final String clientAuth =
validationContext.getProperty(CLIENT_AUTH).getValue();
- final SSLContextService sslContextService =
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ private final static List<PropertyDescriptor> propertyDescriptors;
- if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
- results.add(new ValidationResult.Builder()
- .explanation("Client Auth must be provided when using
TLS/SSL")
- .valid(false).subject("Client Auth").build());
- }
+ private final static Set<Relationship> relationships;
- return results;
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.add(SMTP_PORT);
+ _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS);
+ _propertyDescriptors.add(SMTP_TIMEOUT);
+ _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE);
+ _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
+ _propertyDescriptors.add(CLIENT_AUTH);
+ propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(_relationships);
}
+ private volatile SMTPServer smtp;
- public static final Relationship REL_SUCCESS = new
Relationship.Builder()
- .name("success")
- .description("Extraction was successful")
- .build();
+ private volatile SmtpConsumer smtpConsumer;
- private Set<Relationship> relationships;
- private List<PropertyDescriptor> propertyDescriptors;
- private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages;
+ /**
+ *
+ */
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory
sessionFactory) throws ProcessException {
+ ProcessSession processSession = sessionFactory.createSession();
+ if (this.smtp == null) {
+ this.setupSmtpIfNecessary(context, processSession);
+ }
+
+ if (this.smtpConsumer.hasMessage()) {
+ try {
+ /*
+ * Will consume incoming message directly from the wire
and into
+ * FlowFile/Content repository before exiting. This
essentially
+ * limits any potential data loss by allowing SMTPServer
thread
+ * to actually commit NiFi session if all good. However in
the
+ * event of exception, such exception will be propagated
back to
+ * the email sender via "undeliverable message" allowing
such
+ * user to re-send the message
+ */
+ this.smtpConsumer.consumeUsing((inputDataStream) -> {
+ FlowFile flowFile = processSession.create();
+ AtomicInteger size = new AtomicInteger();
+ flowFile = processSession.write(flowFile, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ size.set(IOUtils.copy(inputDataStream, out));
+ }
+ });
+
processSession.getProvenanceReporter().receive(flowFile, "smtp://"
+ + ListenSMTP.this.smtp.getHostName() + ":" +
ListenSMTP.this.smtp.getPort() + "/");
+ processSession.transfer(flowFile, REL_SUCCESS);
+ processSession.commit();
+ return size.get();
+ });
+ } catch (Exception e) {
+ this.getLogger().error("Failed while listenning for
messages.", e);
+ processSession.rollback();
+ }
+ } else {
+ context.yield();
+ }
+ }
- private volatile SMTPServer server;
- private AtomicBoolean initialized = new AtomicBoolean(false);
- private AtomicBoolean stopping = new AtomicBoolean(false);
+ /**
+ *
+ */
+ @OnStopped
+ public void close() {
+ this.getLogger().info("Stopping SMTPServer");
+ this.smtp.stop();
+ this.smtp = null;
+ this.getLogger().info("SMTPServer stopped");
+ }
+ /**
+ *
+ */
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
+ /**
+ *
+ */
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- this.relationships = Collections.unmodifiableSet(relationships);
-
- final List<PropertyDescriptor> props = new ArrayList<>();
- props.add(SMTP_PORT);
- props.add(SMTP_HOSTNAME);
- props.add(SMTP_MAXIMUM_CONNECTIONS);
- props.add(SMTP_TIMEOUT);
- props.add(SMTP_MAXIMUM_MSG_SIZE);
- props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE);
- props.add(SSL_CONTEXT_SERVICE);
- props.add(CLIENT_AUTH);
- this.propertyDescriptors = Collections.unmodifiableList(props);
-
- }
-
- // Upon Schedule, reset the initialized state to false
- @OnScheduled
- public void onScheduled(ProcessContext context) {
- initialized.set(false);
- stopping.set(false);
- }
-
- protected synchronized void initializeSMTPServer(final ProcessContext
context) throws Exception {
-
- // check if we are already running or if it is stopping
- if (initialized.get() && server.isRunning() || stopping.get() ) {
- return;
- }
-
- incomingMessages = new
LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger());
-
- String clientAuth = null;
-
- // If an SSLContextService was provided then create an SSLContext
to pass down to the server
- SSLContext sslContext = null;
- final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- if (sslContextService != null) {
- clientAuth = context.getProperty(CLIENT_AUTH).getValue();
- sslContext =
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
+ /**
+ *
+ */
+ private synchronized void setupSmtpIfNecessary(ProcessContext context,
ProcessSession processSession) {
+ if (this.smtp == null) {
+ SmtpConsumer consumer = new SmtpConsumer();
+ SMTPServer smtpServer = this.createServerInstance(context,
consumer);
+ smtpServer.setSoftwareName("Apache NiFi");
+ smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger());
+
smtpServer.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
+
smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+
+ this.smtpConsumer = consumer;
+ this.smtp = smtpServer;
+ this.smtp.start();
}
+ }
- final SSLContext finalSslContext = sslContext;
-
- SMTPMessageHandlerFactory smtpMessageHandlerFactory = new
SMTPMessageHandlerFactory(incomingMessages, getLogger());
- final SMTPServer server = new
SMTPServer(smtpMessageHandlerFactory) {
-
+ /**
+ *
+ */
+ private SMTPServer createServerInstance(ProcessContext context,
SmtpConsumer consumer) {
+ SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ SMTPServer smtpServer = sslContextService == null ? new
SMTPServer(consumer) : new SMTPServer(consumer) {
@Override
public SSLSocket createSSLSocket(Socket socket) throws
IOException {
InetSocketAddress remoteAddress = (InetSocketAddress)
socket.getRemoteSocketAddress();
-
- SSLSocketFactory socketFactory =
finalSslContext.getSocketFactory();
-
- SSLSocket s = (SSLSocket)
(socketFactory.createSocket(socket, remoteAddress.getHostName(),
socket.getPort(), true));
-
- s.setUseClientMode(false);
-
-
- // For some reason the createSSLContext above is not
enough to enforce
- // client side auth
- // If client auth is required...
- if
(SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue()))
{
- s.setNeedClientAuth(true);
+ String clientAuth =
context.getProperty(CLIENT_AUTH).getValue();
+ SSLContext sslContext =
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
+ SSLSocketFactory socketFactory =
sslContext.getSocketFactory();
+ SSLSocket sslSocket = (SSLSocket)
(socketFactory.createSocket(socket,
remoteAddress.getHostName(),socket.getPort(), true));
+ sslSocket.setUseClientMode(false);
+
+ if
(SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) {
+ this.setRequireTLS(true);
+ sslSocket.setNeedClientAuth(true);
}
-
- return s;
+ return sslSocket;
}
};
-
- // Set some parameters to our server
- server.setSoftwareName("Apache NiFi");
-
-
- // Set the Server options based on properties
- server.setPort(context.getProperty(SMTP_PORT).asInteger());
- server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
-
server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
-
server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
-
server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
-
-
- // Check if TLS should be enabled
if (sslContextService != null) {
- server.setEnableTLS(true);
- } else {
- server.setHideTLS(true);
- }
-
- // Set TLS to required in case CLIENT_AUTH = required
- if
(SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue()))
{
- server.setRequireTLS(true);
- }
-
- this.server = server;
- server.start();
-
- getLogger().info("Server started and listening on port " +
server.getPort());
-
- initialized.set(true);
- stopping.set(false);
- }
-
- @OnUnscheduled
- public void startShutdown() throws Exception {
- if (server != null) {
- stopping.set(true);
- getLogger().info("Shutting down processor P{}", new
Object[]{server});
- server.stop();
- getLogger().info("Shut down {}", new Object[]{server});
- }
- }
-
- @OnStopped
- public void completeShutdown() throws Exception {
- if (server != null) {
- if (!server.isRunning() && stopping.get() ) {
- stopping.set(false);
- }
- getLogger().info("Completed shut down {}", new
Object[]{server});
- }
- }
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
-
- try {
- initializeSMTPServer(context);
- } catch (Exception e) {
- context.yield();
- throw new ProcessException("Failed to initialize the SMTP
server", e);
- }
-
- while (!incomingMessages.isEmpty()) {
- SmtpEvent message = incomingMessages.poll();
-
- if (message == null) {
- return;
- }
-
- synchronized (message) {
- if (resultCodeSetAndIsError(message)) {
- SMTPResultCode resultCode =
SMTPResultCode.fromCode(message.getReturnCode());
- getLogger().warn("Message failed before onTrigger
processing message was: " + resultCode.getLogMessage());
- continue;
- }
-
- try {
- FlowFile flowfile = session.create();
-
- if (message.getMessageData() != null) {
- flowfile = session.write(flowfile, out -> {
- InputStream inputStream =
message.getMessageData();
- byte [] buffer = new byte[1024];
-
- int rd;
- long totalBytesRead =0;
-
- while ((rd = inputStream.read(buffer, 0,
buffer.length)) != -1 ) {
- totalBytesRead += rd;
- if (totalBytesRead >
server.getMaxMessageSize() ) {
- message.setReturnCode(500);
- message.setProcessed();
- break;
- }
- out.write(buffer, 0, rd);
- }
- out.flush();
- });
- } else {
- getLogger().debug("Message body was null");
-
message.setReturnCode(SMTPResultCode.UNKNOWN_ERROR_CODE.getCode());
- message.setProcessed();
- }
-
- if (!message.getProcessed()) {
- HashMap<String, String> attributes = new
HashMap<>();
- // Gather message attributes
- attributes.put(SMTP_HELO, message.getHelo());
- attributes.put(SMTP_SRC_IP, message.getHelo());
- attributes.put(SMTP_FROM, message.getFrom());
- attributes.put(SMTP_TO, message.getTo());
-
- List<Map<String, String>> details =
message.getCertifcateDetails();
- int c = 0;
-
- // Add a selection of each X509 certificates to
the already gathered attributes
-
- for (Map<String, String> detail : details) {
- attributes.put("smtp.certificate." + c +
".serial", detail.getOrDefault("SerialNumber", null));
- attributes.put("smtp.certificate." + c +
".subjectName", detail.getOrDefault("SubjectName", null));
--- End diff --
Not necessarily against it, but could you elaborate some more on the
problem we are trying to solve here primarily from practical standpoint?
Just so you understand, as initial implementation of the processor (any
processor) the idea is to keep it as simple as we can, to provide basic
functionality that is usable yet may require some improvements and additional
features. Those are very easy to add. However keep in mind that removing
features is practically impossible as it always results in breaking backward
compatibility, so unless it is essential I would rather defer it to he future
until we have a solid and practical use case rather then stick it in there
"just in case" and then finding an issue with that in the future and face the
difficult decision of backward compatibility especially when dealing with file
attributes.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---