Github user trixpan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/483#discussion_r70180468
--- Diff:
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
---
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.nifi.processors.email;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
+import
org.apache.nifi.processors.email.smtp.handler.SMTPMessageHandlerFactory;
+import org.subethamail.smtp.server.SMTPServer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.ssl.SSLContextService;
+
+@Tags({"listen", "email", "smtp"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("This processor implements a lightweight SMTP
server to an arbitrary port, " +
+ "allowing nifi to listen for incoming email. " +
+ "" +
+ "Note this server does not perform any email validation. If direct
exposure to the internet is sought," +
+ "it may be a better idea to use the combination of NiFi and an
industrial scale MTA (e.g. Postfix)")
+@WritesAttributes({
+ @WritesAttribute(attribute = "mime.type", description = "The value
used during HELO"),
+ @WritesAttribute(attribute = "smtp.helo", description = "The value
used during HELO"),
+ @WritesAttribute(attribute = "smtp.certificates.*.serial",
description = "The serial numbers for each of the " +
+ "certificates used by an TLS peer"),
+ @WritesAttribute(attribute = "smtp.certificates.*.principal",
description = "The principal for each of the " +
+ "certificates used by an TLS peer"),
+ @WritesAttribute(attribute = "smtp.from", description = "The value
used during MAIL FROM (i.e. envelope)"),
+ @WritesAttribute(attribute = "smtp.to", description = "The value
used during RCPT TO (i.e. envelope)")})
+
+public class ListenSMTP extends AbstractProcessor {
+ public static final String SMTP_HELO = "smtp.helo";
+ public static final String SMTP_FROM = "smtp.from";
+ public static final String SMTP_TO = "smtp.to";
+ public static final String MIME_TYPE = "message/rfc822";
+
+
+ protected static final PropertyDescriptor SMTP_PORT = new
PropertyDescriptor.Builder()
+ .name("SMTP_PORT")
+ .displayName("Listening Port")
+ .description("The TCP port the ListenSMTP processor will bind
to." +
+ "NOTE that on Unix derivative operating systems this
port must " +
+ "be higher than 1024 unless NiFi is running as with
root user permissions.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor SMTP_HOSTNAME = new
PropertyDescriptor.Builder()
+ .name("SMTP_HOSTNAME")
+ .displayName("SMTP hostname")
+ .description("The hostname to be embedded into the banner
displayed when an " +
+ "SMTP client connects to the processor TCP port .")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS =
new PropertyDescriptor.Builder()
+ .name("SMTP_MAXIMUM_CONNECTIONS")
+ .displayName("Maximum number of SMTP connection")
+ .description("The maximum number of simultaneous SMTP
connections.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor SMTP_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("SMTP_TIMEOUT")
+ .displayName("SMTP connection timeout")
+ .description("The maximum time to wait for an action of SMTP
client.")
+ .defaultValue("10 seconds")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new
PropertyDescriptor.Builder()
+ .name("SMTP_MAXIMUM_MSG_SIZE")
+ .displayName("SMTP Maximum Message Size")
+ .description("The maximum number of bytes the server will
accept.")
+ .required(true)
+ .defaultValue("20MB")
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
+ .name("SSL_CONTEXT_SERVICE")
+ .displayName("SSL Context Service")
+ .description("The Controller Service to use in order to obtain
an SSL Context. If this property is set, " +
+ "messages will be received over a secure connection.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+
+ public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
+ .name("CLIENT_AUTH")
+ .displayName("Client Auth")
+ .description("The client authentication policy to use for the
SSL Context. Only used if an SSL Context Service is provided.")
+ .required(false)
+ .allowableValues(SSLContextService.ClientAuth.NONE.toString(),
SSLContextService.ClientAuth.REQUIRED.toString())
+ .build();
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ final String clientAuth =
validationContext.getProperty(CLIENT_AUTH).getValue();
+ final SSLContextService sslContextService =
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+ if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
+ results.add(new ValidationResult.Builder()
+ .explanation("Client Auth must be provided when using
TLS/SSL")
+ .valid(false).subject("Client Auth").build());
+ }
+
+ return results;
+
+ }
+
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("Extraction was successful")
+ .build();
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> propertyDescriptors;
+ private volatile LinkedBlockingQueue<SmtpEvent> messages;
+
+ private volatile SMTPServer server;
+ private AtomicBoolean initialized = new AtomicBoolean(false);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
+
+ final List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(SMTP_PORT);
+ props.add(SMTP_HOSTNAME);
+ props.add(SMTP_MAXIMUM_CONNECTIONS);
+ props.add(SMTP_TIMEOUT);
+ props.add(SMTP_MAXIMUM_MSG_SIZE);
+ props.add(SSL_CONTEXT_SERVICE);
+ props.add(CLIENT_AUTH);
+ this.propertyDescriptors = Collections.unmodifiableList(props);
+
+ }
+
+ final ComponentLog logger = getLogger();
+
+ // Upon Schedule, reset the initialized state to false
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ initialized.set(false);
+ }
+
+ protected synchronized void initializeSMTPServer(final ProcessContext
context) throws Exception {
+ if (initialized.get()) {
--- End diff --
addressed?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---