http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index 0000000,ee16610..adea255 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@@ -1,0 -1,904 +1,904 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.File; + import java.io.FileInputStream; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.net.MalformedURLException; + import java.security.KeyManagementException; + import java.security.KeyStore; + import java.security.KeyStoreException; + import java.security.NoSuchAlgorithmException; + import java.security.UnrecoverableKeyException; + import java.security.cert.CertificateException; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.UUID; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicReference; + import java.util.regex.Pattern; + + import javax.net.ssl.SSLContext; + import javax.net.ssl.SSLPeerUnverifiedException; + import javax.net.ssl.SSLSession; + import javax.security.cert.X509Certificate; + import javax.servlet.http.HttpServletResponse; + + import org.apache.http.Header; + import org.apache.http.HttpException; + import org.apache.http.HttpResponse; + import org.apache.http.HttpResponseInterceptor; + import org.apache.http.auth.AuthScope; + import org.apache.http.auth.UsernamePasswordCredentials; + import org.apache.http.client.CredentialsProvider; + import org.apache.http.client.HttpClient; + import org.apache.http.client.config.RequestConfig; + import org.apache.http.client.methods.CloseableHttpResponse; + import org.apache.http.client.methods.HttpDelete; + import org.apache.http.client.methods.HttpHead; + import org.apache.http.client.methods.HttpPost; + import org.apache.http.config.Registry; + import org.apache.http.config.RegistryBuilder; + import org.apache.http.conn.HttpClientConnectionManager; + import org.apache.http.conn.ManagedHttpClientConnection; + import org.apache.http.conn.socket.ConnectionSocketFactory; + import org.apache.http.conn.ssl.SSLConnectionSocketFactory; + import org.apache.http.conn.ssl.SSLContexts; + import org.apache.http.conn.ssl.TrustSelfSignedStrategy; + import org.apache.http.entity.ContentProducer; + import org.apache.http.entity.EntityTemplate; + import org.apache.http.impl.client.BasicCredentialsProvider; + import org.apache.http.impl.client.CloseableHttpClient; + import org.apache.http.impl.client.HttpClientBuilder; + import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; + import org.apache.http.protocol.HttpContext; + import org.apache.http.protocol.HttpCoreContext; + import org.apache.http.util.EntityUtils; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.OnStopped; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; ++import org.apache.nifi.annotation.lifecycle.OnStopped; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.ssl.SSLContextService; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.stream.io.BufferedOutputStream; + import org.apache.nifi.stream.io.GZIPOutputStream; + import org.apache.nifi.stream.io.LeakyBucketStreamThrottler; + import org.apache.nifi.stream.io.StreamThrottler; + import org.apache.nifi.stream.io.StreamUtils; + import org.apache.nifi.util.FlowFilePackager; + import org.apache.nifi.util.FlowFilePackagerV1; + import org.apache.nifi.util.FlowFilePackagerV2; + import org.apache.nifi.util.FlowFilePackagerV3; + import org.apache.nifi.util.FormatUtils; + import org.apache.nifi.util.ObjectHolder; + import org.apache.nifi.util.StopWatch; + + import com.sun.jersey.api.client.ClientResponse.Status; + + @SupportsBatching + @Tags({"http", "https", "remote", "copy", "archive"}) + @CapabilityDescription("Performs an HTTP Post with the content of the FlowFile") + public class PostHTTP extends AbstractProcessor { + + public static final String CONTENT_TYPE = "Content-Type"; + public static final String ACCEPT = "Accept"; + public static final String ACCEPT_ENCODING = "Accept-Encoding"; + public static final String APPLICATION_FLOW_FILE_V1 = "application/flowfile"; + public static final String APPLICATION_FLOW_FILE_V2 = "application/flowfile-v2"; + public static final String APPLICATION_FLOW_FILE_V3 = "application/flowfile-v3"; + public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; + public static final String FLOWFILE_CONFIRMATION_HEADER = "x-prefer-acknowledge-uri"; + public static final String LOCATION_HEADER_NAME = "Location"; + public static final String LOCATION_URI_INTENT_NAME = "x-location-uri-intent"; + public static final String LOCATION_URI_INTENT_VALUE = "flowfile-hold"; + public static final String GZIPPED_HEADER = "flowfile-gzipped"; + + public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version"; + public static final String TRANSACTION_ID_HEADER = "x-nifi-transaction-id"; + public static final String PROTOCOL_VERSION = "3"; + + public static final PropertyDescriptor URL = new PropertyDescriptor.Builder() + .name("URL") + .description("The URL to POST to. The first part of the URL must be static. However, the path of the URL may be defined using the Attribute Expression Language. For example, https://${hostname} is not valid, but https://1.1.1.1:8080/files/${nf.file.name} is valid.") + .required(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("https?\\://.*"))) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor SEND_AS_FLOWFILE = new PropertyDescriptor.Builder() + .name("Send as FlowFile") + .description("If true, will package the FlowFile's contents and attributes together and send the FlowFile Package; otherwise, will send only the FlowFile's content") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder() + .name("Connection Timeout") + .description("How long to wait when attempting to connect to the remote server before giving up") + .required(true) + .defaultValue("30 sec") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder() + .name("Data Timeout") + .description("How long to wait between receiving segments of data from the remote server before giving up and discarding the partial file") + .required(true) + .defaultValue("30 sec") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("Username") + .description("Username required to access the URL") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("Password required to access the URL") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USER_AGENT = new PropertyDescriptor.Builder() + .name("User Agent") + .description("What to report as the User Agent when we connect to the remote server") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder() + .name("Compression Level") + .description("Determines the GZIP Compression Level to use when sending the file; the value must be in the range of 0-9. A value of 0 indicates that the file will not be GZIP'ed") + .required(true) + .addValidator(StandardValidators.createLongValidator(0, 9, true)) + .defaultValue("0") + .build(); + public static final PropertyDescriptor ATTRIBUTES_AS_HEADERS_REGEX = new PropertyDescriptor.Builder() + .name("Attributes to Send as HTTP Headers (Regex)") + .description("Specifies the Regular Expression that determines the names of FlowFile attributes that should be sent as HTTP Headers") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .required(false) + .build(); + public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder() + .name("Max Data to Post per Second") + .description("The maximum amount of data to send per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Max Batch Size") + .description("If the Send as FlowFile property is true, specifies the max data size for a batch of FlowFiles to send in a single HTTP POST. If not specified, each FlowFile will be sent separately. If the Send as FlowFile property is false, this property is ignored") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("100 MB") + .build(); + public static final PropertyDescriptor CHUNKED_ENCODING = new PropertyDescriptor.Builder() + .name("Use Chunked Encoding") + .description("Specifies whether or not to use Chunked Encoding to send the data. If false, the entire content of the FlowFile will be buffered into memory.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Files that are successfully send will be transferred to success").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Files that fail to send will transferred to failure").build(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> properties; + + private final AtomicReference<DestinationAccepts> acceptsRef = new AtomicReference<>(); + private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference<>(); + private final ConcurrentMap<String, Config> configMap = new ConcurrentHashMap<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(URL); + properties.add(MAX_BATCH_SIZE); + properties.add(MAX_DATA_RATE); + properties.add(SSL_CONTEXT_SERVICE); + properties.add(USERNAME); + properties.add(PASSWORD); + properties.add(SEND_AS_FLOWFILE); + properties.add(CHUNKED_ENCODING); + properties.add(COMPRESSION_LEVEL); + properties.add(CONNECTION_TIMEOUT); + properties.add(DATA_TIMEOUT); + properties.add(ATTRIBUTES_AS_HEADERS_REGEX); + properties.add(USER_AGENT); + this.properties = Collections.unmodifiableList(properties); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext context) { + final Collection<ValidationResult> results = new ArrayList<>(); + + if (context.getProperty(URL).getValue().startsWith("https") && context.getProperty(SSL_CONTEXT_SERVICE).getValue() == null) { + results.add(new ValidationResult.Builder() + .explanation("URL is set to HTTPS protocol but no SSLContext has been specified") + .valid(false) + .subject("SSL Context") + .build()); + } + + return results; + } + + @OnStopped + public void onStopped() { + this.acceptsRef.set(null); + + for (final Map.Entry<String, Config> entry : configMap.entrySet()) { + final Config config = entry.getValue(); + config.getConnectionManager().shutdown(); + } + + configMap.clear(); + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + final Double bytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); + this.throttlerRef.set(bytesPerSecond == null ? null : new LeakyBucketStreamThrottler(bytesPerSecond.intValue())); + } + + private String getBaseUrl(final String url) { + final int index = url.indexOf("/", 9); + if (index < 0) { + return url; + } + + return url.substring(0, index); + } + + private Config getConfig(final String url, final ProcessContext context) { + final String baseUrl = getBaseUrl(url); + Config config = configMap.get(baseUrl); + if (config != null) { + return config; + } + + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final SSLContext sslContext; + try { + sslContext = createSSLContext(sslContextService); + } catch (final Exception e) { + throw new ProcessException(e); + } + + final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[] { "TLSv1" }, null, + SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER); + + final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create() + .register("https", sslsf).build(); + + final PoolingHttpClientConnectionManager conMan = new PoolingHttpClientConnectionManager(socketFactoryRegistry); + conMan.setDefaultMaxPerRoute(context.getMaxConcurrentTasks()); + conMan.setMaxTotal(context.getMaxConcurrentTasks()); + config = new Config(conMan); + final Config existingConfig = configMap.putIfAbsent(baseUrl, config); + + return (existingConfig == null) ? config : existingConfig; + } + + + private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException, + CertificateException, KeyManagementException, UnrecoverableKeyException + { + final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType()); + try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) { + truststore.load(in, service.getTrustStorePassword().toCharArray()); + } + + final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType()); + try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) { + keystore.load(in, service.getKeyStorePassword().toCharArray()); + } + + SSLContext sslContext = SSLContexts.custom() + .loadTrustMaterial(truststore, new TrustSelfSignedStrategy()) + .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()) + .build(); + + return sslContext; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean(); + final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger(); + final String userAgent = context.getProperty(USER_AGENT).getValue(); + + final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); + requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + requestConfigBuilder.setRedirectsEnabled(false); + requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + final RequestConfig requestConfig = requestConfigBuilder.build(); + + final StreamThrottler throttler = throttlerRef.get(); + final ProcessorLog logger = getLogger(); + + final Double maxBatchBytes = context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B); + String lastUrl = null; + long bytesToSend = 0L; + + final List<FlowFile> toSend = new ArrayList<>(); + DestinationAccepts destinationAccepts = null; + CloseableHttpClient client = null; + final String transactionId = UUID.randomUUID().toString(); + + final ObjectHolder<String> dnHolder = new ObjectHolder<>("none"); + while (true) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + break; + } + + final String url = context.getProperty(URL).evaluateAttributeExpressions(flowFile).getValue(); + try { + new java.net.URL(url); + } catch (final MalformedURLException e) { + logger.error("After substituting attribute values for {}, URL is {}; this is not a valid URL, so routing to failure", + new Object[]{flowFile, url}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + continue; + } + + // If this FlowFile doesn't have the same url, throw it back on the queue and stop grabbing FlowFiles + if (lastUrl != null && !lastUrl.equals(url)) { + session.transfer(flowFile); + break; + } + + lastUrl = url; + toSend.add(flowFile); + + if (client == null || destinationAccepts == null) { + final Config config = getConfig(url, context); + final HttpClientConnectionManager conMan = config.getConnectionManager(); + + final HttpClientBuilder clientBuilder = HttpClientBuilder.create(); + clientBuilder.setConnectionManager(conMan); + clientBuilder.setUserAgent(userAgent); + clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() { + @Override + public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { + HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext); + ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class); + SSLSession sslSession = conn.getSSLSession(); + + if ( sslSession != null ) { + final X509Certificate[] certChain = sslSession.getPeerCertificateChain(); + if (certChain == null || certChain.length == 0) { + throw new SSLPeerUnverifiedException("No certificates found"); + } + + final X509Certificate cert = certChain[0]; + dnHolder.set(cert.getSubjectDN().getName().trim()); + } + } + }); + + clientBuilder.disableAutomaticRetries(); + clientBuilder.disableContentCompression(); + + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + // set the credentials if appropriate + if (username != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + if (password == null) { + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username)); + } else { + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + }; + clientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + client = clientBuilder.build(); + + // determine whether or not destination accepts flowfile/gzip + destinationAccepts = config.getDestinationAccepts(); + if (destinationAccepts == null) { + try { + if (sendAsFlowFile) { + destinationAccepts = getDestinationAcceptance(client, url, getLogger(), transactionId); + } else { + destinationAccepts = new DestinationAccepts(false, false, false, false, null); + } + + config.setDestinationAccepts(destinationAccepts); + } catch (IOException e) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + logger.error("Unable to communicate with destination {} to determine whether or not it can accept flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, flowFile, e}); + context.yield(); + return; + } + } + } + + // if we are not sending as flowfile, or if the destination doesn't accept V3 or V2 (streaming) format, + // then only use a single FlowFile + if (!sendAsFlowFile || (!destinationAccepts.isFlowFileV3Accepted() && !destinationAccepts.isFlowFileV2Accepted())) { + break; + } + + bytesToSend += flowFile.getSize(); + if (bytesToSend > maxBatchBytes.longValue()) { + break; + } + } + + if (toSend.isEmpty()) { + return; + } + + final String url = lastUrl; + final HttpPost post = new HttpPost(url); + final List<FlowFile> flowFileList = toSend; + final DestinationAccepts accepts = destinationAccepts; + final boolean isDestinationLegacyNiFi = accepts.getProtocolVersion() == null; + + final EntityTemplate entity = new EntityTemplate(new ContentProducer() { + @Override + public void writeTo(final OutputStream rawOut) throws IOException { + final OutputStream throttled = (throttler == null) ? rawOut : throttler.newThrottledOutputStream(rawOut); + OutputStream wrappedOut = new BufferedOutputStream(throttled); + if (compressionLevel > 0 && accepts.isGzipAccepted()) { + wrappedOut = new GZIPOutputStream(wrappedOut, compressionLevel); + } + + try (final OutputStream out = wrappedOut) { + for (final FlowFile flowFile : flowFileList) { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + + FlowFilePackager packager = null; + if (!sendAsFlowFile) { + packager = null; + } else if (accepts.isFlowFileV3Accepted()) { + packager = new FlowFilePackagerV3(); + } else if (accepts.isFlowFileV2Accepted()) { + packager = new FlowFilePackagerV2(); + } else if (accepts.isFlowFileV1Accepted()) { + packager = new FlowFilePackagerV1(); + } + + // if none of the above conditions is met, we should never get here, because + // we will have already verified that at least 1 of the FlowFile packaging + // formats is acceptable if sending as FlowFile. + if (packager == null) { + StreamUtils.copy(in, out); + } else { + final Map<String, String> flowFileAttributes; + if (isDestinationLegacyNiFi) { + // Old versions of NiFi expect nf.file.name and nf.file.path to indicate filename & path; + // in order to maintain backward compatibility, we copy the filename & path to those attribute keys. + flowFileAttributes = new HashMap<>(flowFile.getAttributes()); + flowFileAttributes.put("nf.file.name", flowFile.getAttribute(CoreAttributes.FILENAME.key())); + flowFileAttributes.put("nf.file.path", flowFile.getAttribute(CoreAttributes.PATH.key())); + } else { + flowFileAttributes = flowFile.getAttributes(); + } + + packager.packageFlowFile(in, out, flowFileAttributes, flowFile.getSize()); + } + } + } + }); + } + + out.flush(); + } + } + }); + + entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean()); + post.setEntity(entity); + post.setConfig(requestConfig); + + final String contentType; + if (sendAsFlowFile) { + if (accepts.isFlowFileV3Accepted()) { + contentType = APPLICATION_FLOW_FILE_V3; + } else if (accepts.isFlowFileV2Accepted()) { + contentType = APPLICATION_FLOW_FILE_V2; + } else if (accepts.isFlowFileV1Accepted()) { + contentType = APPLICATION_FLOW_FILE_V1; + } else { + logger.error("Cannot send data to {} because the destination does not accept FlowFiles and this processor is configured to deliver FlowFiles; rolling back session", new Object[]{url}); + session.rollback(); + context.yield(); + return; + } + } else { + final String attributeValue = toSend.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()); + contentType = (attributeValue == null) ? DEFAULT_CONTENT_TYPE : attributeValue; + } + + final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue(); + if (attributeHeaderRegex != null && !sendAsFlowFile && flowFileList.size() == 1) { + final Pattern pattern = Pattern.compile(attributeHeaderRegex); + + final Map<String, String> attributes = flowFileList.get(0).getAttributes(); + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + final String key = entry.getKey(); + if (pattern.matcher(key).matches()) { + post.setHeader(entry.getKey(), entry.getValue()); + } + } + } + + post.setHeader(CONTENT_TYPE, contentType); + post.setHeader(FLOWFILE_CONFIRMATION_HEADER, "true"); + post.setHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION); + post.setHeader(TRANSACTION_ID_HEADER, transactionId); + if (compressionLevel > 0 && accepts.isGzipAccepted()) { + post.setHeader(GZIPPED_HEADER, "true"); + } + + // Do the actual POST + final String flowFileDescription = toSend.size() <= 10 ? toSend.toString() : toSend.size() + " FlowFiles"; + + final String uploadDataRate; + final long uploadMillis; + CloseableHttpResponse response = null; + try { + final StopWatch stopWatch = new StopWatch(true); + response = client.execute(post); + + // consume input stream entirely, ignoring its contents. If we + // don't do this, the Connection will not be returned to the pool + EntityUtils.consume(response.getEntity()); + stopWatch.stop(); + uploadDataRate = stopWatch.calculateDataRate(bytesToSend); + uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + } catch (final IOException e) { + logger.error("Failed to Post {} due to {}; transferring to failure", new Object[]{flowFileDescription, e}); + context.yield(); + for (FlowFile flowFile : toSend) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + return; + } finally { + if ( response != null ) { + try { + response.close(); + } catch (IOException e) { + getLogger().warn("Failed to close HTTP Response due to {}", new Object[] {e}); + } + } + } + + // If we get a 'SEE OTHER' status code and an HTTP header that indicates that the intent + // of the Location URI is a flowfile hold, we will store this holdUri. This prevents us + // from posting to some other webservice and then attempting to delete some resource to which + // we are redirected + final int responseCode = response.getStatusLine().getStatusCode(); + final String responseReason = response.getStatusLine().getReasonPhrase(); + String holdUri = null; + if (responseCode == HttpServletResponse.SC_SEE_OTHER) { + final Header locationUriHeader = response.getFirstHeader(LOCATION_URI_INTENT_NAME); + if (locationUriHeader != null) { + if (LOCATION_URI_INTENT_VALUE.equals(locationUriHeader.getValue())) { + final Header holdUriHeader = response.getFirstHeader(LOCATION_HEADER_NAME); + if (holdUriHeader != null) { + holdUri = holdUriHeader.getValue(); + } + } + } + + if (holdUri == null) { + for (FlowFile flowFile : toSend) { + flowFile = session.penalize(flowFile); + logger.error("Failed to Post {} to {}: sent content and received status code {}:{} but no Hold URI", new Object[]{flowFile, url, responseCode, responseReason}); + session.transfer(flowFile, REL_FAILURE); + } + return; + } + } + + if (holdUri == null) { + if (responseCode == HttpServletResponse.SC_SERVICE_UNAVAILABLE) { + for (FlowFile flowFile : toSend) { + flowFile = session.penalize(flowFile); + logger.error("Failed to Post {} to {}: response code was {}:{}; will yield processing, since the destination is temporarily unavailable", new Object[]{flowFile, url, responseCode, responseReason}); + session.transfer(flowFile, REL_FAILURE); + } + context.yield(); + return; + } + + if (responseCode >= 300) { + for (FlowFile flowFile : toSend) { + flowFile = session.penalize(flowFile); + logger.error("Failed to Post {} to {}: response code was {}:{}", new Object[]{flowFile, url, responseCode, responseReason}); + session.transfer(flowFile, REL_FAILURE); + } + return; + } + + logger.info("Successfully Posted {} to {} in {} at a rate of {}", new Object[]{ + flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate}); + + for (final FlowFile flowFile : toSend) { + session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + dnHolder.get(), uploadMillis, true); + session.transfer(flowFile, REL_SUCCESS); + } + return; + } + + // + // the response indicated a Hold URI; delete the Hold. + // + // determine the full URI of the Flow File's Hold; Unfortunately, the responses that are returned have + // changed over the past, so we have to take into account a few different possibilities. + String fullHoldUri = holdUri; + if (holdUri.startsWith("/contentListener")) { + // If the Hold URI that we get starts with /contentListener, it may not really be /contentListener, + // as this really indicates that it should be whatever we posted to -- if posting directly to the + // ListenHTTP component, it will be /contentListener, but if posting to a proxy/load balancer, we may + // be posting to some other URL. + fullHoldUri = url + holdUri.substring(16); + } else if (holdUri.startsWith("/")) { + // URL indicates the full path but not hostname or port; use the same hostname & port that we posted + // to but use the full path indicated by the response. + int firstSlash = url.indexOf("/", 8); + if (firstSlash < 0) { + firstSlash = url.length(); + } + final String beforeSlash = url.substring(0, firstSlash); + fullHoldUri = beforeSlash + holdUri; + } else if (!holdUri.startsWith("http")) { + // Absolute URL + fullHoldUri = url + (url.endsWith("/") ? "" : "/") + holdUri; + } + + final HttpDelete delete = new HttpDelete(fullHoldUri); + delete.setHeader(TRANSACTION_ID_HEADER, transactionId); + + while (true) { + try { + final HttpResponse holdResponse = client.execute(delete); + EntityUtils.consume(holdResponse.getEntity()); + final int holdStatusCode = holdResponse.getStatusLine().getStatusCode(); + final String holdReason = holdResponse.getStatusLine().getReasonPhrase(); + if (holdStatusCode >= 300) { + logger.error("Failed to delete Hold that destination placed on {}: got response code {}:{}; routing to failure", + new Object[]{flowFileDescription, holdStatusCode, holdReason}); + + for (FlowFile flowFile : toSend) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + return; + } + + logger.info("Successfully Posted {} to {} in {} milliseconds at a rate of {}", + new Object[]{flowFileDescription, url, uploadMillis, uploadDataRate}); + + for (FlowFile flowFile : toSend) { + session.getProvenanceReporter().send(flowFile, url); + session.transfer(flowFile, REL_SUCCESS); + } + return; + } catch (final IOException e) { + logger.warn("Failed to delete Hold that destination placed on {} due to {}", new Object[]{flowFileDescription, e}); + } + + if (!isScheduled()) { + context.yield(); + logger.warn("Failed to delete Hold that destination placed on {}; Processor has been stopped so routing FlowFile(s) to failure", new Object[]{flowFileDescription}); + for (FlowFile flowFile : toSend) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + return; + } + } + } + + + private DestinationAccepts getDestinationAcceptance(final HttpClient client, final String uri, final ProcessorLog logger, final String transactionId) throws IOException { + final HttpHead head = new HttpHead(uri); + head.addHeader(TRANSACTION_ID_HEADER, transactionId); + final HttpResponse response = client.execute(head); + + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == Status.METHOD_NOT_ALLOWED.getStatusCode()) { + // we assume that the destination can support FlowFile v1 always. + return new DestinationAccepts(false, false, true, false, null); + } else if (statusCode == Status.OK.getStatusCode()) { + boolean acceptsFlowFileV3 = false; + boolean acceptsFlowFileV2 = false; + boolean acceptsFlowFileV1 = true; + boolean acceptsGzip = false; + Integer protocolVersion = null; + + Header[] headers = response.getHeaders(ACCEPT); + if (headers != null) { + for (final Header header : headers) { + for (final String accepted : header.getValue().split(",")) { + final String trimmed = accepted.trim(); + if (trimmed.equals(APPLICATION_FLOW_FILE_V3)) { + acceptsFlowFileV3 = true; + } else if (trimmed.equals(APPLICATION_FLOW_FILE_V2)) { + acceptsFlowFileV2 = true; + } else { + // we assume that the destination accepts FlowFile V1 because legacy versions + // of NiFi that accepted V1 did not use an Accept header to indicate it... or + // any other header. So the bets thing we can do is just assume that V1 is + // accepted, if we're going to send as FlowFile. + acceptsFlowFileV1 = true; + } + } + } + } + + final Header destinationVersion = response.getFirstHeader(PROTOCOL_VERSION_HEADER); + if (destinationVersion != null) { + try { + protocolVersion = Integer.valueOf(destinationVersion.getValue()); + } catch (final NumberFormatException e) { + // nothing to do here really.... it's an invalid value, so treat the same as if not specified + } + } + + if (acceptsFlowFileV3) { + logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V3 + " if sending data as FlowFile"); + } else if (acceptsFlowFileV2) { + logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V2 + " if sending data as FlowFile"); + } else if (acceptsFlowFileV1) { + logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile"); + } + + headers = response.getHeaders(ACCEPT_ENCODING); + if (headers != null) { + for (final Header header : headers) { + for (final String accepted : header.getValue().split(",")) { + if (accepted.equalsIgnoreCase("gzip")) { + acceptsGzip = true; + } + } + } + } + + if (acceptsGzip) { + logger.debug("Connection to URI " + uri + " indicates that inline GZIP compression is supported"); + } else { + logger.debug("Connection to URI " + uri + " indicates that it does NOT support inline GZIP compression"); + } + + return new DestinationAccepts(acceptsFlowFileV3, acceptsFlowFileV2, acceptsFlowFileV1, acceptsGzip, protocolVersion); + } else { + logger.warn("Unable to communicate with destination; when attempting to perform an HTTP HEAD, got unexpected response code of " + statusCode + ": " + response.getStatusLine().getReasonPhrase()); + return new DestinationAccepts(false, false, false, false, null); + } + } + + private static class DestinationAccepts { + + private final boolean flowFileV1; + private final boolean flowFileV2; + private final boolean flowFileV3; + private final boolean gzip; + private final Integer protocolVersion; + + public DestinationAccepts(final boolean flowFileV3, final boolean flowFileV2, final boolean flowFileV1, + final boolean gzip, final Integer protocolVersion) { + this.flowFileV3 = flowFileV3; + this.flowFileV2 = flowFileV2; + this.flowFileV1 = flowFileV1; + this.gzip = gzip; + this.protocolVersion = protocolVersion; + } + + public boolean isFlowFileV3Accepted() { + return flowFileV3; + } + + public boolean isFlowFileV2Accepted() { + return flowFileV2; + } + + public boolean isFlowFileV1Accepted() { + return flowFileV1; + } + + public boolean isGzipAccepted() { + return gzip; + } + + public Integer getProtocolVersion() { + return protocolVersion; + } + } + + private static class Config { + + private volatile DestinationAccepts destinationAccepts; + private final HttpClientConnectionManager conMan; + + public Config(final HttpClientConnectionManager conMan) { + this.conMan = conMan; + } + + public DestinationAccepts getDestinationAccepts() { + return this.destinationAccepts; + } + + public void setDestinationAccepts(final DestinationAccepts destinationAccepts) { + this.destinationAccepts = destinationAccepts; + } + + public HttpClientConnectionManager getConnectionManager() { + return conMan; + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java index 0000000,ef602c9..2fa71c8 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java @@@ -1,0 -1,297 +1,297 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.IOException; + import java.io.InputStream; + import java.net.Socket; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.Date; + import java.util.HashSet; + import java.util.List; + import java.util.Map.Entry; + import java.util.Properties; + import java.util.Set; + + import javax.activation.DataHandler; + import javax.mail.Message; + import javax.mail.Message.RecipientType; + import javax.mail.Session; + import javax.mail.URLName; + import javax.mail.internet.AddressException; + import javax.mail.internet.InternetAddress; + import javax.mail.internet.MimeBodyPart; + import javax.mail.internet.MimeMessage; + import javax.mail.internet.MimeMultipart; + import javax.mail.internet.PreencodedMimeBodyPart; + import javax.mail.util.ByteArrayDataSource; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + + import org.apache.commons.codec.binary.Base64; + + import com.sun.mail.smtp.SMTPTransport; + + @SupportsBatching + @Tags({"email", "put", "notify", "smtp"}) + @CapabilityDescription("Sends an e-mail to configured recipients for each incoming FlowFile") + public class PutEmail extends AbstractProcessor { + + public static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder() + .name("SMTP Hostname") + .description("The hostname of the SMTP host") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder() + .name("SMTP Port") + .description("The Port used for SMTP communications") + .required(true) + .defaultValue("25") + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder() + .name("From") + .description("Specifies the Email address to use as the sender") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor TO = new PropertyDescriptor.Builder() + .name("To") + .description("The recipients to include in the To-Line of the email") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor CC = new PropertyDescriptor.Builder() + .name("CC") + .description("The recipients to include in the CC-Line of the email") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor BCC = new PropertyDescriptor.Builder() + .name("BCC") + .description("The recipients to include in the BCC-Line of the email") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder() + .name("Subject") + .description("The email subject") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("Message from NiFi") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor MESSAGE = new PropertyDescriptor.Builder() + .name("Message") + .description("The body of the email message") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor ATTACH_FILE = new PropertyDescriptor.Builder() + .name("Attach File") + .description("Specifies whether or not the FlowFile content should be attached to the email") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + public static final PropertyDescriptor INCLUDE_ALL_ATTRIBUTES = new PropertyDescriptor.Builder() + .name("Include All Attributes In Message") + .description("Specifies whether or not all FlowFile attributes should be recorded in the body of the email message") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are successfully sent will be routed to this relationship").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that fail to send will be routed to this relationship").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SMTP_HOSTNAME); + properties.add(SMTP_PORT); + properties.add(FROM); + properties.add(TO); + properties.add(CC); + properties.add(BCC); + properties.add(SUBJECT); + properties.add(MESSAGE); + properties.add(ATTACH_FILE); + properties.add(INCLUDE_ALL_ATTRIBUTES); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext context) { + final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context)); + + final String to = context.getProperty(TO).getValue(); + final String cc = context.getProperty(CC).getValue(); + final String bcc = context.getProperty(BCC).getValue(); + + if (to == null && cc == null && bcc == null) { + errors.add(new ValidationResult.Builder().subject("To, CC, BCC").valid(false).explanation("Must specify at least one To/CC/BCC address").build()); + } + + return errors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Properties properties = new Properties(); + properties.setProperty("smtp.mail.host", context.getProperty(SMTP_HOSTNAME).getValue()); + final Session mailSession = Session.getInstance(properties); + final Message message = new MimeMessage(mailSession); + final ProcessorLog logger = getLogger(); + + try { + message.setFrom(InternetAddress.parse(context.getProperty(FROM).evaluateAttributeExpressions(flowFile).getValue())[0]); + + final InternetAddress[] toAddresses = toInetAddresses(context.getProperty(TO).evaluateAttributeExpressions(flowFile).getValue()); + message.setRecipients(RecipientType.TO, toAddresses); + + final InternetAddress[] ccAddresses = toInetAddresses(context.getProperty(CC).evaluateAttributeExpressions(flowFile).getValue()); + message.setRecipients(RecipientType.CC, ccAddresses); + + final InternetAddress[] bccAddresses = toInetAddresses(context.getProperty(BCC).evaluateAttributeExpressions(flowFile).getValue()); + message.setRecipients(RecipientType.BCC, bccAddresses); + + message.setHeader("X-Mailer", "NiFi"); + message.setSubject(context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue()); + String messageText = context.getProperty(MESSAGE).evaluateAttributeExpressions(flowFile).getValue(); + + if (context.getProperty(INCLUDE_ALL_ATTRIBUTES).asBoolean()) { + messageText = formatAttributes(flowFile, messageText); + } + + message.setText(messageText); + message.setSentDate(new Date()); + + if (context.getProperty(ATTACH_FILE).asBoolean()) { + final MimeBodyPart mimeText = new PreencodedMimeBodyPart("base64"); + mimeText.setDataHandler(new DataHandler(new ByteArrayDataSource(Base64.encodeBase64(messageText.getBytes("UTF-8")), "text/plain; charset=\"utf-8\""))); + final MimeBodyPart mimeFile = new MimeBodyPart(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream stream) throws IOException { + try { + mimeFile.setDataHandler(new DataHandler(new ByteArrayDataSource(stream, "application/octet-stream"))); + } catch (final Exception e) { + throw new IOException(e); + } + } + }); + + mimeFile.setFileName(flowFile.getAttribute(CoreAttributes.FILENAME.key())); + MimeMultipart multipart = new MimeMultipart(); + multipart.addBodyPart(mimeText); + multipart.addBodyPart(mimeFile); + message.setContent(multipart); + } + + final String smtpHost = context.getProperty(SMTP_HOSTNAME).getValue(); + final SMTPTransport transport = new SMTPTransport(mailSession, new URLName(smtpHost)); + try { + final int smtpPort = context.getProperty(SMTP_PORT).asInteger(); + transport.connect(new Socket(smtpHost, smtpPort)); + transport.sendMessage(message, message.getAllRecipients()); + } finally { + transport.close(); + } + + session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString()); + session.transfer(flowFile, REL_SUCCESS); + logger.info("Sent email as a result of receiving {}", new Object[]{flowFile}); + } catch (final Exception e) { + context.yield(); + logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + } + } + + public static final String BODY_SEPARATOR = "\n\n--------------------------------------------------\n"; + + private static String formatAttributes(final FlowFile flowFile, final String messagePrepend) { + StringBuilder message = new StringBuilder(messagePrepend); + message.append(BODY_SEPARATOR); + message.append("\nStandard FlowFile Metadata:"); + message.append(String.format("\n\t%1$s = '%2$s'", "id", flowFile.getId())); + message.append(String.format("\n\t%1$s = '%2$s'", "entryDate", new Date(flowFile.getEntryDate()))); + message.append(String.format("\n\t%1$s = '%2$s'", "fileSize", flowFile.getSize())); + message.append("\nFlowFile Attributes:"); + for (Entry<String, String> attribute : flowFile.getAttributes().entrySet()) { + message.append(String.format("\n\t%1$s = '%2$s'", attribute.getKey(), attribute.getValue())); + } + message.append("\n"); + return message.toString(); + } + + private static InternetAddress[] toInetAddresses(final String val) throws AddressException { + if (val == null) { + return new InternetAddress[0]; + } + return InternetAddress.parse(val); + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java index 0000000,6c8a816..dac367f mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java @@@ -1,0 -1,141 +1,141 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.Collections; + import java.util.List; + import java.util.Map; + import java.util.TreeMap; + import java.util.concurrent.atomic.AtomicReference; + import java.util.regex.Matcher; + import java.util.regex.Pattern; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.processors.standard.util.FTPTransfer; + + @SupportsBatching + @Tags({"remote", "copy", "egress", "put", "ftp", "archive", "files"}) + @CapabilityDescription("Sends FlowFiles to an FTP Server") + public class PutFTP extends PutFileTransfer<FTPTransfer> { + + private static final Pattern PRE_SEND_CMD_PATTERN = Pattern.compile("^pre\\.cmd\\.(\\d+)$"); + private static final Pattern POST_SEND_CMD_PATTERN = Pattern.compile("^post\\.cmd\\.(\\d+)$"); + + private final AtomicReference<List<PropertyDescriptor>> preSendDescriptorRef = new AtomicReference<>(); + private final AtomicReference<List<PropertyDescriptor>> postSendDescriptorRef = new AtomicReference<>(); + + private List<PropertyDescriptor> properties; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(FTPTransfer.HOSTNAME); + properties.add(FTPTransfer.PORT); + properties.add(FTPTransfer.USERNAME); + properties.add(FTPTransfer.PASSWORD); + properties.add(FTPTransfer.REMOTE_PATH); + properties.add(FTPTransfer.CREATE_DIRECTORY); + properties.add(FTPTransfer.BATCH_SIZE); + properties.add(FTPTransfer.CONNECTION_TIMEOUT); + properties.add(FTPTransfer.DATA_TIMEOUT); + properties.add(FTPTransfer.CONFLICT_RESOLUTION); + properties.add(FTPTransfer.DOT_RENAME); + properties.add(FTPTransfer.TEMP_FILENAME); + properties.add(FTPTransfer.TRANSFER_MODE); + properties.add(FTPTransfer.CONNECTION_MODE); + properties.add(FTPTransfer.REJECT_ZERO_BYTE); + properties.add(FTPTransfer.LAST_MODIFIED_TIME); + properties.add(FTPTransfer.PERMISSIONS); + properties.add(FTPTransfer.USE_COMPRESSION); + + this.properties = Collections.unmodifiableList(properties); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected void beforePut(final FlowFile flowFile, final ProcessContext context, final FTPTransfer transfer) throws IOException { + transfer.sendCommands(getCommands(preSendDescriptorRef.get(), context, flowFile), flowFile); + } + + @Override + protected void afterPut(final FlowFile flowFile, final ProcessContext context, final FTPTransfer transfer) throws IOException { + transfer.sendCommands(getCommands(postSendDescriptorRef.get(), context, flowFile), flowFile); + } + + @Override + protected FTPTransfer getFileTransfer(final ProcessContext context) { + return new FTPTransfer(context, getLogger()); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .build(); + } + + @OnScheduled + public void determinePrePostSendProperties(final ProcessContext context) { + final Map<Integer, PropertyDescriptor> preDescriptors = new TreeMap<>(); + final Map<Integer, PropertyDescriptor> postDescriptors = new TreeMap<>(); + + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + final String name = descriptor.getName(); + final Matcher preMatcher = PRE_SEND_CMD_PATTERN.matcher(name); + if (preMatcher.matches()) { + final int index = Integer.parseInt(preMatcher.group(1)); + preDescriptors.put(index, descriptor); + } else { + final Matcher postMatcher = POST_SEND_CMD_PATTERN.matcher(name); + if (postMatcher.matches()) { + final int index = Integer.parseInt(postMatcher.group(1)); + postDescriptors.put(index, descriptor); + } + } + } + + final List<PropertyDescriptor> preDescriptorList = new ArrayList<>(preDescriptors.values()); + final List<PropertyDescriptor> postDescriptorList = new ArrayList<>(postDescriptors.values()); + this.preSendDescriptorRef.set(preDescriptorList); + this.postSendDescriptorRef.set(postDescriptorList); + } + + private List<String> getCommands(final List<PropertyDescriptor> descriptors, final ProcessContext context, final FlowFile flowFile) { + final List<String> cmds = new ArrayList<>(); + for (final PropertyDescriptor descriptor : descriptors) { + cmds.add(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue()); + } + + return cmds; + } + }
