http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java
index 0000000,5fc7e5a..e39ab26
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java
@@@ -1,0 -1,255 +1,255 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.SortedMap;
+ import java.util.TreeMap;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ import org.apache.commons.codec.digest.DigestUtils;
+ import org.apache.commons.lang3.StringUtils;
+ 
+ /**
+  * <p>
+  * This processor identifies groups of user-specified flowfile attributes and
+  * assigns a unique hash value to each group, recording this hash value in the
+  * flowfile's attributes using a user-specified attribute key. The groups are
+  * identified dynamically and preserved across application restarts. </p>
+  *
+  * <p>
+  * The user must supply optional processor properties during runtime to
+  * correctly configure this processor. The optional property key will be used 
as
+  * the flowfile attribute key for attribute inspection. The value must be a
+  * valid regular expression. This regular expression is evaluated against the
+  * flowfile attribute values. If the regular expression contains a capturing
+  * group, the value of that group will be used when comparing flow file
+  * attributes. Otherwise, the original flow file attribute's value will be 
used
+  * if and only if the value matches the given regular expression. </p>
+  *
+  * <p>
+  * If a flowfile does not have an attribute entry for one or more processor
+  * configured values, then the flowfile is routed to failure. </p>
+  *
+  * <p>
+  * An example hash value identification:
+  *
+  * Assume Processor Configured with Two Properties ("MDKey1" = ".*" and 
"MDKey2"
+  * = "(.).*").
+  *
+  * FlowFile 1 has the following attributes: MDKey1 = a MDKey2 = b
+  *
+  * and will be assigned to group 1 (since no groups exist yet)
+  *
+  * FlowFile 2 has the following attributes: MDKey1 = 1 MDKey2 = 2
+  *
+  * and will be assigned to group 2 (attribute keys do not match existing 
groups)
+  *
+  * FlowFile 3 has the following attributes: MDKey1 = a MDKey2 = z
+  *
+  * and will be assigned to group 3 (attribute keys do not match existing 
groups)
+  *
+  * FlowFile 4 has the following attribute: MDKey1 = a MDKey2 = bad
+  *
+  * and will be assigned to group 1 (because the value of MDKey1 has the 
regular
+  * expression ".*" applied to it, and that evaluates to the same as MDKey1
+  * attribute of the first flow file. Similarly, the capturing group for the
+  * MDKey2 property indicates that only the first character of the MDKey2
+  * attribute must match, and the first character of MDKey2 for Flow File 1 and
+  * Flow File 4 are both 'b'.)
+  *
+  * FlowFile 5 has the following attributes: MDKey1 = a
+  *
+  * and will route to failure because it does not have MDKey2 entry in its
+  * attribute
+  * </p>
+  *
+  * <p>
+  * The following flow file attributes are created or modified: <ul>
+  * <li><b>&lt;group.id.attribute.key&gt;</b> - The hash value.</li> </ul> </p>
+  */
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"attributes", "hash"})
+ @CapabilityDescription("Hashes together the key/value pairs of several 
FlowFile Attributes and adds the hash as a new attribute. "
+         + "Optional properties are to be added such that the name of the 
property is the name of a FlowFile Attribute to consider "
+         + "and the value of the property is a regular expression that, if 
matched by the attribute value, will cause that attribute "
+         + "to be used as part of the hash. If the regular expression contains 
a capturing group, only the value of the capturing "
+         + "group will be used.")
+ public class HashAttribute extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor HASH_VALUE_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+             .name("Hash Value Attribute Key")
+             .description("The name of the FlowFile Attribute where the hash 
value should be stored")
+             .required(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("Used for FlowFiles that 
have a hash value added").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("Used for FlowFiles that are 
missing required attributes").build();
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+     private final AtomicReference<Map<String, Pattern>> regexMapRef = new 
AtomicReference<>(Collections.<String, Pattern>emptyMap());
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_FAILURE);
+         relationships.add(REL_SUCCESS);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(HASH_VALUE_ATTRIBUTE);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+         return new PropertyDescriptor.Builder()
+                 .name(propertyDescriptorName)
+                 .addValidator(StandardValidators.createRegexValidator(0, 1, 
false))
+                 .required(false)
+                 .dynamic(true)
+                 .build();
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+         if (descriptor.isRequired()) {
+             return;
+         }
+ 
+         final Map<String, Pattern> patternMap = new 
HashMap<>(regexMapRef.get());
+         if (newValue == null) {
+             patternMap.remove(descriptor.getName());
+         } else {
+             if (newValue.equals(".*")) {
+                 patternMap.put(descriptor.getName(), null);
+             } else {
+                 final Pattern pattern = Pattern.compile(newValue);
+                 patternMap.put(descriptor.getName(), pattern);
+             }
+         }
+ 
+         regexMapRef.set(Collections.unmodifiableMap(patternMap));
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final Map<String, Pattern> patterns = regexMapRef.get();
+         final ProcessorLog logger = getLogger();
+ 
+         final SortedMap<String, String> attributes = 
getRelevantAttributes(flowFile, patterns);
+         if (attributes.size() != patterns.size()) {
+             final Set<String> wantedKeys = patterns.keySet();
+             final Set<String> foundKeys = attributes.keySet();
+             final StringBuilder missingKeys = new StringBuilder();
+             for (final String wantedKey : wantedKeys) {
+                 if (!foundKeys.contains(wantedKey)) {
+                     missingKeys.append(wantedKey).append(" ");
+                 }
+             }
+ 
+             logger.error("routing {} to 'failure' because of missing 
attributes: {}", new Object[]{flowFile, missingKeys.toString()});
+             session.transfer(flowFile, REL_FAILURE);
+         } else {
+             // create single string of attribute key/value pairs to use for 
group ID hash
+             final StringBuilder hashableValue = new StringBuilder();
+             for (final Map.Entry<String, String> entry : 
attributes.entrySet()) {
+                 hashableValue.append(entry.getKey());
+                 if (StringUtils.isBlank(entry.getValue())) {
+                     hashableValue.append("EMPTY");
+                 } else {
+                     hashableValue.append(entry.getValue());
+                 }
+             }
+ 
+             // create group ID
+             final String hashValue = 
DigestUtils.md5Hex(hashableValue.toString());
+ 
+             logger.info("adding Hash Value {} to attributes for {} and 
routing to success", new Object[]{hashValue, flowFile});
+             flowFile = session.putAttribute(flowFile, 
context.getProperty(HASH_VALUE_ATTRIBUTE).getValue(), hashValue);
+             session.getProvenanceReporter().modifyAttributes(flowFile);
+             session.transfer(flowFile, REL_SUCCESS);
+         }
+     }
+ 
+     private SortedMap<String, String> getRelevantAttributes(final FlowFile 
flowFile, final Map<String, Pattern> patterns) {
+         final SortedMap<String, String> attributeMap = new TreeMap<>();
+         for (final Map.Entry<String, Pattern> entry : patterns.entrySet()) {
+             final String attributeName = entry.getKey();
+             final String attributeValue = 
flowFile.getAttribute(attributeName);
+             if (attributeValue != null) {
+                 final Pattern pattern = entry.getValue();
+                 if (pattern == null) {
+                     attributeMap.put(attributeName, attributeValue);
+                 } else {
+                     final Matcher matcher = pattern.matcher(attributeValue);
+                     if (matcher.matches()) {
+                         if (matcher.groupCount() == 0) {
+                             attributeMap.put(attributeName, matcher.group(0));
+                         } else {
+                             attributeMap.put(attributeName, matcher.group(1));
+                         }
+                     }
+                 }
+             }
+         }
+ 
+         return attributeMap;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
index 0000000,bb71589..827653b
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
@@@ -1,0 -1,151 +1,151 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.security.DigestOutputStream;
+ import java.security.MessageDigest;
+ import java.security.NoSuchAlgorithmException;
+ import java.security.Security;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.NullOutputStream;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.ObjectHolder;
+ 
+ @EventDriven
+ @SupportsBatching
+ @Tags({"hash", "content", "MD5", "SHA-1", "SHA-256"})
+ @CapabilityDescription("Calculates a hash value for the Content of a FlowFile 
and puts that hash value on the FlowFile as an attribute whose name is 
determined by the <Hash Attribute Name> property")
+ public class HashContent extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
+             .name("Hash Attribute Name")
+             .description("The name of the FlowFile Attribute into which the 
Hash Value should be written. If the value already exists, it will be 
overwritten")
+             .required(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .defaultValue("hash.value")
+             .build();
+ 
+     public static final PropertyDescriptor HASH_ALGORITHM = new 
PropertyDescriptor.Builder()
+             .name("Hash Algorithm")
+             .description("Determines what hashing algorithm should be used to 
perform the hashing function")
+             .required(true)
+             .allowableValues(Security.getAlgorithms("MessageDigest"))
+             .defaultValue("MD5")
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+             .description("FlowFiles that are process successfully will be 
sent to this relationship").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+             .description("Any FlowFile that cannot be processed successfully 
will be sent to this relationship without any attribute being added").build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     @Override
+     protected void init(ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> props = new ArrayList<>();
+         props.add(ATTRIBUTE_NAME);
+         props.add(HASH_ALGORITHM);
+         properties = Collections.unmodifiableList(props);
+ 
+         final Set<Relationship> rels = new HashSet<>();
+         rels.add(REL_SUCCESS);
+         rels.add(REL_FAILURE);
+         relationships = Collections.unmodifiableSet(rels);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final String algorithm = 
context.getProperty(HASH_ALGORITHM).getValue();
+         final MessageDigest digest;
+         try {
+             digest = MessageDigest.getInstance(algorithm);
+         } catch (NoSuchAlgorithmException e) {
+             logger.error("Failed to process {} due to {}; routing to 
failure", new Object[]{flowFile, e});
+             session.transfer(flowFile, REL_FAILURE);
+             return;
+         }
+ 
+         final ObjectHolder<String> hashValueHolder = new ObjectHolder<>(null);
+ 
+         try {
+             session.read(flowFile, new InputStreamCallback() {
+                 @Override
+                 public void process(final InputStream in) throws IOException {
+                     try (final DigestOutputStream digestOut = new 
DigestOutputStream(new NullOutputStream(), digest)) {
+                         StreamUtils.copy(in, digestOut);
+ 
+                         final byte[] hash = digest.digest();
+                         final StringBuilder strb = new 
StringBuilder(hash.length * 2);
+                         for (int i = 0; i < hash.length; i++) {
+                             strb.append(Integer.toHexString((hash[i] & 0xFF) 
| 0x100).substring(1, 3));
+                         }
+ 
+                         hashValueHolder.set(strb.toString());
+                     }
+                 }
+             });
+ 
+             final String attributeName = 
context.getProperty(ATTRIBUTE_NAME).getValue();
+             flowFile = session.putAttribute(flowFile, attributeName, 
hashValueHolder.get());
+             logger.info("Successfully added attribute '{}' to {} with a value 
of {}; routing to success", new Object[]{attributeName, flowFile, 
hashValueHolder.get()});
+             session.getProvenanceReporter().modifyAttributes(flowFile);
+             session.transfer(flowFile, REL_SUCCESS);
+         } catch (final Exception e) {
+             logger.error("Failed to process {} due to {}; routing to 
failure", new Object[]{flowFile, e});
+             session.transfer(flowFile, REL_FAILURE);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java
index 0000000,50bdba2..a52ac3b
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java
@@@ -1,0 -1,474 +1,474 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.BufferedInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.zip.ZipInputStream;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.util.FlowFilePackagerV1;
+ import org.apache.nifi.util.FlowFilePackagerV3;
+ import org.apache.nifi.util.ObjectHolder;
+ 
+ import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+ 
+ /**
+  * <p>
+  * Attempts to detect the MIME Type of a FlowFile by examining its contents. 
If
+  * the MIME Type is determined, it is added to an attribute with the name
+  * mime.type
+  * </p>
+  *
+  * <p>
+  * The following MIME Types are supported:
+  *
+  * <ul>
+  * <li>application/gzip</li>
+  * <li>application/bzip2</li>
+  * <li>application/flowfile-v3</li>
+  * <li>application/flowfile-v1 (requires Identify TAR be set to true)</li>
+  * <li>application/xml</li>
+  * <li>video/mp4</li>
+  * <li>video/x-m4v</li>
+  * <li>video/mp4a-latm</li>
+  * <li>video/quicktime</li>
+  * <li>video/mpeg</li>
+  * <li>audio/wav</li>
+  * <li>audio/mp3</li>
+  * <li>image/bmp</li>
+  * <li>image/png</li>
+  * <li>image/jpg</li>
+  * <li>image/gif</li>
+  * <li>image/tif</li>
+  * <li>application/vnd.ms-works</li>
+  * <li>application/msexcel</li>
+  * <li>application/mspowerpoint</li>
+  * <li>application/msaccess</li>
+  * <li>application/x-ms-wmv</li>
+  * <li>application/pdf</li>
+  * <li>application/x-rpm</li>
+  * <li>application/tar</li>
+  * <li>application/x-7z-compressed</li>
+  * <li>application/java-archive</li>
+  * <li>application/zip</li>
+  * <li>application/x-lzh</li>
+  * </ul>
+  * </p>
+  */
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"compression", "gzip", "bzip2", "zip", "MIME", "mime.type", "file", 
"identify"})
+ @CapabilityDescription("Attempts to identify the MIME Type used for a 
FlowFile. If the MIME Type can be identified, "
+         + "an attribute with the name 'mime.type' is added with the value 
being the MIME Type. If the MIME Type cannot be determined, "
+         + "the value will be set to 'application/octet-stream'. Some MIME 
Types require reading a significant amount of data; for these MIME Types, their 
identification "
+         + "is optional. The algorithm may have to read the entire contents of 
the file for each type of identification.")
+ public class IdentifyMimeType extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor IDENTIFY_ZIP = new 
PropertyDescriptor.Builder()
+             .name("Identify ZIP")
+             .description("Determines whether or not to attempt in depth 
identification of ZIP MIME types")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+     public static final PropertyDescriptor IDENTIFY_TAR = new 
PropertyDescriptor.Builder()
+             .name("Identify TAR")
+             .description("Determines whether or not to attempt in depth 
identification of TAR MIME types")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("All FlowFiles are routed to 
success").build();
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+ 
+     private final List<MagicHeader> magicHeaders;
+     private final List<MagicHeader> zipMagicHeaders;
+     private final List<MagicHeader> tarMagicHeaders;
+     private final List<ContentScanningMimeTypeIdentifier> contentScanners;
+     private final int magicHeaderMaxLength;
+ 
+     public IdentifyMimeType() {
+         // compile a list of Magic Header detectors
+         final List<MagicHeader> headers = new ArrayList<>();
+         headers.add(new SimpleMagicHeader("application/gzip", new 
byte[]{0x1f, (byte) 0x8b}));
+         headers.add(new SimpleMagicHeader("application/bzip2", new 
byte[]{0x42, 0x5a}));
+         headers.add(new SimpleMagicHeader("application/flowfile-v3", 
FlowFilePackagerV3.MAGIC_HEADER));
+         headers.add(new SimpleMagicHeader("application/xml", new byte[]{0x3c, 
0x3f, 0x78, 0x6d, 0x6c, 0x20}));
+         headers.add(new SimpleMagicHeader("video/mp4", new byte[]{0, 0, 0, 
0x14, 0x66, 0x74, 0x79, 0x70, 0x69, 0x73, 0x6F, 0x6D}));
+         headers.add(new SimpleMagicHeader("video/mp4", new byte[]{0, 0, 0, 
0x14, 0x66, 0x74, 0x79, 0x70, 0x33, 0x67, 0x70, 0x35}));
+         headers.add(new SimpleMagicHeader("video/mp4", new byte[]{0, 0, 0, 
0x14, 0x66, 0x74, 0x79, 0x70, 0X4d, 0X53, 0X4e, 0X56, 0X01, 0X29, 0, 0X46, 
0X4d, 0X53, 0X4e, 0X56, 0X6d, 0X70, 0X34, 0X32}));
+         headers.add(new SimpleMagicHeader("video/x-m4v", new byte[]{0, 0, 0, 
0x18, 0x66, 0x74, 0x79, 0x70, 0x6D, 0x70, 0x34, 0x32}));
+         headers.add(new SimpleMagicHeader("video/mp4a-latm", new byte[]{0, 0, 
0, 0x18, 0x66, 0x74, 0x79, 0x70, 0x4D, 0x34, 0x41, 0x20}));
+         headers.add(new SimpleMagicHeader("video/quicktime", new byte[]{0, 0, 
0, 0x14, 0x66, 0x74, 0x79, 0x70, 0x71, 0x74, 0x20, 0x20}));
+         headers.add(new SimpleMagicHeader("video/quicktime", new byte[]{0x6D, 
0x6F, 0x6F, 0x76}, 4));
+         headers.add(new SimpleMagicHeader("audio/mp3", new byte[]{0x49, 0x44, 
0x33}));
+         headers.add(new SimpleMagicHeader("image/bmp", new byte[]{0x42, 
0x4D}));
+         headers.add(new SimpleMagicHeader("image/png", new byte[]{(byte) 
0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A}));
+         headers.add(new SimpleMagicHeader("image/jpg", new byte[]{(byte) 
0xFF, (byte) 0xD8, (byte) 0xFF}));
+         headers.add(new SimpleMagicHeader("image/gif", new byte[]{0x47, 0x49, 
0x46, 0x38, 0x37, 0x61}));
+         headers.add(new SimpleMagicHeader("image/gif", new byte[]{0x47, 0x49, 
0x46, 0x38, 0x39, 0x61}));
+         headers.add(new SimpleMagicHeader("image/tif", new byte[]{0x49, 0x20, 
0x49}));
+         headers.add(new SimpleMagicHeader("image/tif", new byte[]{0x49, 0x49, 
0x2A, 0x00}));
+         headers.add(new SimpleMagicHeader("image/tif", new byte[]{0x4D, 0x4D, 
0x00, 0x2A}));
+         headers.add(new SimpleMagicHeader("image/tif", new byte[]{0x4D, 0x4D, 
0x00, 0x2B}));
+         headers.add(new SimpleMagicHeader("application/vnd.ms-works", new 
byte[]{(byte) 0xFF, 0x00, 0x02, 0x00, 0x04, 0x04, 0x05, 0x54, 0x02, 0x00}));
+         headers.add(new SimpleMagicHeader("application/msexcel", new 
byte[]{0x09, 0x08, 0x10, 0, 0, 0x06, 0x05, 0}, 512));
+         headers.add(new SimpleMagicHeader("application/mspowerpoint", new 
byte[]{0x00, 0x6E, 0x1E, (byte) 0xF0}, 512));
+         headers.add(new SimpleMagicHeader("application/mspowerpoint", new 
byte[]{0x0F, 0x00, (byte) 0xE8, 0x03}, 512));
+         headers.add(new SimpleMagicHeader("application/mspowerpoint", new 
byte[]{(byte) 0xA0, 0x46, 0x1D, (byte) 0xF0}, 512));
+         headers.add(new CompoundMagicHeader("application/mspowerpoint",
+                 new SimpleMagicHeader("", new byte[]{(byte) 0xFD, (byte) 
0xFF, (byte) 0xFF, (byte) 0xFF}, 512),
+                 new SimpleMagicHeader("", new byte[]{0x00, 0x00, 0x00}, 
517)));
+         headers.add(new SimpleMagicHeader("application/msaccess", new 
byte[]{0x00, 0x01, 0x00, 0x00, 0x53, 0x74, 0x61, 0x6E, 0x64, 0x61, 0x72, 0x64, 
0x20, 0x41, 0x43, 0x45, 0x20, 0x44, 0x42}));
+         headers.add(new SimpleMagicHeader("application/msaccess", new 
byte[]{0x00, 0x01, 0x00, 0x00, 0x53, 0x74, 0x61, 0x6E, 0x64, 0x61, 0x72, 0x64, 
0x20, 0x4A, 0x65, 0x74, 0x20, 0x44, 0x42}));
+         for (byte b : new byte[]{0x10, 0x1F, 0x22, 0x23, 0x28, 0x29}) {
+             headers.add(new SimpleMagicHeader("application/msaccess", new 
byte[]{(byte) 0xFD, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, b, 0x00}, 512));
+             headers.add(new SimpleMagicHeader("application/msaccess", new 
byte[]{(byte) 0xFD, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, b, 0x02}, 512));
+         }
+         headers.add(new SimpleMagicHeader("application/x-ms-wmv", new 
byte[]{0x30, 0x26, (byte) 0xB2, 0x75, (byte) 0x8E, 0x66, (byte) 0xCF, 0x11, 
(byte) 0xA6, (byte) 0xD9, 0x00, (byte) 0xAA, 0x00, 0x62, (byte) 0xCE, 0x6C}));
+         headers.add(new SimpleMagicHeader("application/pdf", new byte[]{0x25, 
0x50, 0x44, 0x46}));
+         headers.add(new SimpleMagicHeader("application/x-rpm", new 
byte[]{(byte) 0xED, (byte) 0xAB, (byte) 0xEE, (byte) 0xDB}));
+         headers.add(new SimpleMagicHeader("application/x-7z-compressed", new 
byte[]{0x37, 0x7A, (byte) 0xBC, (byte) 0xAF, 0x27, 0x1C}));
+         headers.add(new SimpleMagicHeader("application/java-archive", new 
byte[]{0x4A, 0x41, 0x52, 0x43, 0x53, 0x00}));
+         headers.add(new SimpleMagicHeader("application/java-archive", new 
byte[]{0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x08}));
+         headers.add(new SimpleMagicHeader("application/java-archive", new 
byte[]{0x50, 0x4B, 0x03, 0x04, (byte) 0xA0, 0x00, 0x00}));
+         headers.add(new SimpleMagicHeader("application/x-lzh", new 
byte[]{0x2D, 0x6C, 0x68}, 2));
+         headers.add(new CompoundMagicHeader("audio/wav",
+                 new SimpleMagicHeader("", new byte[]{0x52, 0x49, 0x46, 0x46}),
+                 new SimpleMagicHeader("", new byte[]{0x57, 0x41, 0x56, 0x45, 
0x66, 0x6D, 0x74, 0x20}, 8)));
+         for (int nibble = 0xB0; nibble <= 0xBF; nibble++) {
+             headers.add(new SimpleMagicHeader("video/mpeg", new byte[]{0x00, 
0x00, 0x01, (byte) nibble}));
+         }
+         this.magicHeaders = Collections.unmodifiableList(headers);
+ 
+         // additional Magic Header detectors that will be turned off based on 
property settings
+         final List<MagicHeader> zipHeaders = new ArrayList<>();
+         zipHeaders.add(new SimpleMagicHeader("application/zip", new 
byte[]{0x50, 0x4B, 0x03, 0x04}));
+         this.zipMagicHeaders = Collections.unmodifiableList(zipHeaders);
+         final List<MagicHeader> tarHeaders = new ArrayList<>();
+         tarHeaders.add(new SimpleMagicHeader("application/tar", new 
byte[]{0x75, 0x73, 0x74, 0x61, 0x72}, 257));
+         this.tarMagicHeaders = Collections.unmodifiableList(tarHeaders);
+ 
+         // determine the max length that we need to buffer for magic headers
+         int max = 0;
+         for (final MagicHeader header : magicHeaders) {
+             max = Math.max(max, header.getRequiredBufferLength());
+         }
+         for (final MagicHeader header : zipMagicHeaders) {
+             max = Math.max(max, header.getRequiredBufferLength());
+         }
+         for (final MagicHeader header : tarMagicHeaders) {
+             max = Math.max(max, header.getRequiredBufferLength());
+         }
+         this.magicHeaderMaxLength = max;
+ 
+         // create list of Content Scanners
+         final List<ContentScanningMimeTypeIdentifier> scanningIdentifiers = 
new ArrayList<>();
+         scanningIdentifiers.add(new ZipIdentifier());
+         scanningIdentifiers.add(new TarIdentifier());
+         this.contentScanners = 
Collections.unmodifiableList(scanningIdentifiers);
+     }
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(IDENTIFY_ZIP);
+         properties.add(IDENTIFY_TAR);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final boolean identifyZip = 
context.getProperty(IDENTIFY_ZIP).asBoolean();
+         final boolean identifyTar = 
context.getProperty(IDENTIFY_TAR).asBoolean();
+ 
+         final ObjectHolder<String> mimeTypeRef = new ObjectHolder<>(null);
+         session.read(flowFile, new InputStreamCallback() {
+             @Override
+             public void process(final InputStream stream) throws IOException {
+                 try (final InputStream in = new BufferedInputStream(stream)) {
+                     // read in up to magicHeaderMaxLength bytes
+                     in.mark(magicHeaderMaxLength);
+                     byte[] header = new byte[magicHeaderMaxLength];
+                     for (int i = 0; i < header.length; i++) {
+                         final int next = in.read();
+                         if (next >= 0) {
+                             header[i] = (byte) next;
+                         } else if (i == 0) {
+                             header = new byte[0];
+                         } else {
+                             final byte[] newBuffer = new byte[i - 1];
+                             System.arraycopy(header, 0, newBuffer, 0, i - 1);
+                             header = newBuffer;
+                             break;
+                         }
+                     }
+                     in.reset();
+ 
+                     for (final MagicHeader magicHeader : magicHeaders) {
+                         if (magicHeader.matches(header)) {
+                             mimeTypeRef.set(magicHeader.getMimeType());
+                             return;
+                         }
+                     }
+ 
+                     if (!identifyZip) {
+                         for (final MagicHeader magicHeader : zipMagicHeaders) 
{
+                             if (magicHeader.matches(header)) {
+                                 mimeTypeRef.set(magicHeader.getMimeType());
+                                 return;
+                             }
+                         }
+                     }
+ 
+                     if (!identifyTar) {
+                         for (final MagicHeader magicHeader : tarMagicHeaders) 
{
+                             if (magicHeader.matches(header)) {
+                                 mimeTypeRef.set(magicHeader.getMimeType());
+                                 return;
+                             }
+                         }
+                     }
+                 }
+             }
+         });
+ 
+         String mimeType = mimeTypeRef.get();
+         if (mimeType == null) {
+             for (final ContentScanningMimeTypeIdentifier scanningIdentifier : 
this.contentScanners) {
+                 if (scanningIdentifier.isEnabled(context)) {
+                     session.read(flowFile, new InputStreamCallback() {
+                         @Override
+                         public void process(final InputStream in) throws 
IOException {
+                             String mimeType = 
scanningIdentifier.getMimeType(in);
+                             if (mimeType != null) {
+                                 mimeTypeRef.set(mimeType);
+                             }
+                         }
+                     });
+ 
+                     if (mimeTypeRef.get() != null) {
+                         break;
+                     }
+                 }
+             }
+         }
+ 
+         mimeType = mimeTypeRef.get();
+         if (mimeType == null) {
+             flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/octet-stream");
+             logger.info("Unable to identify MIME Type for {}; setting to 
application/octet-stream", new Object[]{flowFile});
+         } else {
+             flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), mimeType);
+             logger.info("Identified {} as having MIME Type {}", new 
Object[]{flowFile, mimeType});
+         }
+ 
+         session.getProvenanceReporter().modifyAttributes(flowFile);
+         session.transfer(flowFile, REL_SUCCESS);
+     }
+ 
+     private static interface ContentScanningMimeTypeIdentifier {
+ 
+         boolean isEnabled(ProcessContext context);
+ 
+         String getMimeType(InputStream in) throws IOException;
+     }
+ 
+     private static class ZipIdentifier implements 
ContentScanningMimeTypeIdentifier {
+ 
+         @Override
+         public String getMimeType(final InputStream in) throws IOException {
+             final ZipInputStream zipIn = new ZipInputStream(in);
+             try {
+                 if (zipIn.getNextEntry() != null) {
+                     return "application/zip";
+                 }
+             } catch (final Exception e) {
+             }
+             return null;
+         }
+ 
+         @Override
+         public boolean isEnabled(final ProcessContext context) {
+             return context.getProperty(IDENTIFY_ZIP).asBoolean();
+         }
+     }
+ 
+     private static class TarIdentifier implements 
ContentScanningMimeTypeIdentifier {
+ 
+         @Override
+         public String getMimeType(final InputStream in) throws IOException {
+             try (final TarArchiveInputStream tarIn = new 
TarArchiveInputStream(in)) {
+                 final TarArchiveEntry firstEntry = tarIn.getNextTarEntry();
+                 if (firstEntry != null) {
+                     if 
(firstEntry.getName().equals(FlowFilePackagerV1.FILENAME_ATTRIBUTES)) {
+                         final TarArchiveEntry secondEntry = 
tarIn.getNextTarEntry();
+                         if (secondEntry != null && 
secondEntry.getName().equals(FlowFilePackagerV1.FILENAME_CONTENT)) {
+                             return "application/flowfile-v1";
+                         }
+                     }
+                     return "application/tar";
+                 }
+             } catch (final Exception e) {
+             }
+             return null;
+         }
+ 
+         @Override
+         public boolean isEnabled(final ProcessContext context) {
+             return context.getProperty(IDENTIFY_TAR).asBoolean();
+         }
+     }
+ 
+     private static interface MagicHeader {
+ 
+         int getRequiredBufferLength();
+ 
+         String getMimeType();
+ 
+         boolean matches(final byte[] header);
+     }
+ 
+     private static class SimpleMagicHeader implements MagicHeader {
+ 
+         private final String mimeType;
+         private final int offset;
+         private final byte[] byteSequence;
+ 
+         public SimpleMagicHeader(final String mimeType, final byte[] 
byteSequence) {
+             this(mimeType, byteSequence, 0);
+         }
+ 
+         public SimpleMagicHeader(final String mimeType, final byte[] 
byteSequence, final int offset) {
+             this.mimeType = mimeType;
+             this.byteSequence = byteSequence;
+             this.offset = offset;
+         }
+ 
+         @Override
+         public int getRequiredBufferLength() {
+             return byteSequence.length + offset;
+         }
+ 
+         @Override
+         public String getMimeType() {
+             return mimeType;
+         }
+ 
+         @Override
+         public boolean matches(final byte[] header) {
+             if (header.length < getRequiredBufferLength()) {
+                 return false;
+             }
+ 
+             for (int i = 0; i < byteSequence.length; i++) {
+                 if (byteSequence[i] != header[offset + i]) {
+                     return false;
+                 }
+             }
+ 
+             return true;
+         }
+     }
+ 
+     private static class CompoundMagicHeader implements MagicHeader {
+ 
+         private final MagicHeader[] headers;
+         private final int requiredLength;
+         private final String mimeType;
+ 
+         public CompoundMagicHeader(final String mimeType, final 
MagicHeader... headers) {
+             this.mimeType = mimeType;
+             this.headers = headers;
+ 
+             int max = 0;
+             for (final MagicHeader header : headers) {
+                 max = Math.max(max, header.getRequiredBufferLength());
+             }
+ 
+             this.requiredLength = max;
+         }
+ 
+         @Override
+         public int getRequiredBufferLength() {
+             return requiredLength;
+         }
+ 
+         @Override
+         public String getMimeType() {
+             return mimeType;
+         }
+ 
+         @Override
+         public boolean matches(final byte[] header) {
+             for (final MagicHeader mh : headers) {
+                 if (!mh.matches(header)) {
+                     return false;
+                 }
+             }
+ 
+             return true;
+         }
+ 
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index 0000000,e5c1ed8..41ecd91
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@@ -1,0 -1,758 +1,758 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import static org.apache.commons.lang3.StringUtils.*;
+ 
+ import java.io.BufferedInputStream;
+ import java.io.BufferedOutputStream;
+ import java.io.ByteArrayOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.net.HttpURLConnection;
+ import java.net.URL;
+ import java.nio.charset.Charset;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Locale;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+ 
+ import javax.net.ssl.HostnameVerifier;
+ import javax.net.ssl.HttpsURLConnection;
+ import javax.net.ssl.SSLContext;
+ import javax.net.ssl.SSLSession;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.ssl.SSLContextService;
+ import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+ import org.apache.commons.lang3.StringUtils;
+ import org.joda.time.format.DateTimeFormat;
+ import org.joda.time.format.DateTimeFormatter;
+ 
+ @SupportsBatching
+ @Tags({"http", "https", "rest", "client"})
+ @CapabilityDescription("An HTTP client processor which converts FlowFile 
attributes to HTTP headers, with configurable HTTP method, url, etc.")
+ public final class InvokeHTTP extends AbstractProcessor {
+ 
+     //-- properties --//
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         Set<String> contextIdentifiers = 
getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class);
+ 
+         PropertyDescriptor contextServiceSelector = new 
PropertyDescriptor.Builder()
+                 .fromPropertyDescriptor(Config.PROP_SSL_CONTEXT_SERVICE)
+                 .allowableValues(contextIdentifiers)
+                 .build();
+ 
+         List<PropertyDescriptor> list = new ArrayList<>(Config.PROPERTIES);
+         list.add(2, contextServiceSelector);
+ 
+         return Collections.unmodifiableList(list);
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String 
propertyDescriptorName) {
+         if 
(Config.PROP_TRUSTED_HOSTNAME.getName().equalsIgnoreCase(propertyDescriptorName))
 {
+             return Config.PROP_TRUSTED_HOSTNAME;
+         }
+         return 
super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
+     }
+ 
+     //-- relationships --//
+     @Override
+     public Set<Relationship> getRelationships() {
+         return Config.RELATIONSHIPS;
+     }
+ 
+     //-- class properties --//
+     final AtomicReference<SSLContext> sslContextRef = new AtomicReference<>();
+     final AtomicReference<Pattern> attributesToSendRef = new 
AtomicReference<>();
+ 
+     @Override
+     public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+         newValue = StringUtils.trimToEmpty(newValue);
+ 
+         // listen for the SSL Context Service property and retrieve the 
SSLContext from the controller service.
+         if 
(Config.PROP_SSL_CONTEXT_SERVICE.getName().equalsIgnoreCase(descriptor.getName()))
 {
+             if (newValue.isEmpty()) {
+                 sslContextRef.set(null);
+             } else {
+                 SSLContextService svc = (SSLContextService) 
getControllerServiceLookup().getControllerService(newValue);
+                 sslContextRef.set(svc.createSSLContext(ClientAuth.NONE));  // 
ClientAuth is only useful for servers, not clients.
+                 getLogger().info("Loading SSL configuration from keystore={} 
and truststore={}", new Object[]{svc.getKeyStoreFile(), 
svc.getTrustStoreFile()});
+             }
+         }
+ 
+         // compile the attributes-to-send filter pattern
+         if 
(Config.PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName()))
 {
+             if (newValue.isEmpty()) {
+                 attributesToSendRef.set(null);
+             } else {
+                 attributesToSendRef.set(Pattern.compile(newValue));
+             }
+         }
+ 
+     }
+ 
+     //-- processing --//
+     @Override
+     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+         List<FlowFile> flowfiles = session.get(Config.MAX_RESULTS_PER_THREAD);
+         if (flowfiles.isEmpty()) {
+             context.yield();
+             return;
+         }
+ 
+         for (FlowFile flowfile : flowfiles) {
+             Transaction transaction = new Transaction(
+                     getLogger(), sslContextRef, attributesToSendRef, context, 
session, flowfile
+             );
+             transaction.process();
+         }
+     }
+ 
+     /**
+      *
+      * Stores properties, relationships, configuration values, hard coded
+      * strings, magic numbers, etc.
+      *
+      *
+      */
+     public interface Config {
+ 
+         //-- magic numbers --//
+         int MAX_RESULTS_PER_THREAD = 50;
+ 
+         //-- flowfile attribute keys returned after reading the response --//
+         String STATUS_CODE = "invokehttp.status.code";
+         String STATUS_MESSAGE = "invokehttp.status.message";
+         String RESPONSE_BODY = "invokehttp.response.body";
+         String REQUEST_URL = "invokehttp.request.url";
+         String TRANSACTION_ID = "invokehttp.tx.id";
+         String REMOTE_DN = "invokehttp.remote.dn";
+ 
+         // Set of flowfile attributes which we generally always ignore during
+         // processing, including when converting http headers, copying 
attributes, etc.
+         // This set includes our strings defined above as well as some 
standard flowfile
+         // attributes.
+         Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(
+                 STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, 
TRANSACTION_ID, REMOTE_DN,
+                 "uuid", "filename", "path"
+         )));
+ 
+         //-- properties --//
+         PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
+                 .name("HTTP Method")
+                 .description("HTTP request method (GET, POST, PUT, DELETE, 
HEAD, OPTIONS).")
+                 .required(true)
+                 .defaultValue("GET")
+                 .expressionLanguageSupported(true)
+                 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                 .build();
+ 
+         PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
+                 .name("Remote URL")
+                 .description("Remote URL which will be connected to, 
including scheme, host, port, path.")
+                 .required(true)
+                 .expressionLanguageSupported(true)
+                 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                 .build();
+ 
+         PropertyDescriptor PROP_CONNECT_TIMEOUT = new 
PropertyDescriptor.Builder()
+                 .name("Connection Timeout")
+                 .description("Max wait time for connection to remote 
service.")
+                 .required(true)
+                 .defaultValue("5 secs")
+                 .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+                 .build();
+ 
+         PropertyDescriptor PROP_READ_TIMEOUT = new 
PropertyDescriptor.Builder()
+                 .name("Read Timeout")
+                 .description("Max wait time for response from remote 
service.")
+                 .required(true)
+                 .defaultValue("15 secs")
+                 .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+                 .build();
+ 
+         PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder()
+                 .name("Include Date Header")
+                 .description("Include an RFC-2616 Date header in the 
request.")
+                 .required(true)
+                 .defaultValue("True")
+                 .allowableValues("True", "False")
+                 .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                 .build();
+ 
+         PropertyDescriptor PROP_FOLLOW_REDIRECTS = new 
PropertyDescriptor.Builder()
+                 .name("Follow Redirects")
+                 .description("Follow HTTP redirects issued by remote server.")
+                 .required(true)
+                 .defaultValue("True")
+                 .allowableValues("True", "False")
+                 .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                 .build();
+ 
+         PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new 
PropertyDescriptor.Builder()
+                 .name("Attributes to Send")
+                 .description("Regular expression that defines which 
attributes to send as HTTP headers in the request. "
+                         + "If not defined, no attributes are sent as 
headers.")
+                 .required(false)
+                 .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+                 .build();
+ 
+         List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+                 PROP_METHOD,
+                 PROP_URL,
+                 PROP_CONNECT_TIMEOUT,
+                 PROP_READ_TIMEOUT,
+                 PROP_DATE_HEADER,
+                 PROP_FOLLOW_REDIRECTS,
+                 PROP_ATTRIBUTES_TO_SEND
+         ));
+ 
+         // The allowableValues of the SSL Context Service property is 
dynamically populated at run time.
+         PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+                 .name("SSL Context Service")
+                 .description("The SSL Context Service used to provide client 
certificate information for TLS/SSL (https) connections.")
+                 .required(false)
+                 
.addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+                 .build();
+ 
+         // property to allow the hostname verifier to be overridden
+         // this is a "hidden" property - it's configured using a dynamic user 
property
+         PropertyDescriptor PROP_TRUSTED_HOSTNAME = new 
PropertyDescriptor.Builder()
+                 .name("Trusted Hostname")
+                 .description("Bypass the normal truststore hostname verifier 
to allow the specified (single) remote hostname as trusted "
+                         + "Enabling this property has MITM security 
implications, use wisely. Only valid with SSL (HTTPS) connections.")
+                 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                 .dynamic(true)
+                 .build();
+ 
+         //-- relationships --//
+         Relationship REL_SUCCESS_REQ = new Relationship.Builder()
+                 .name("Original")
+                 .description("Original FlowFile will be routed upon success 
(2xx status codes).")
+                 .build();
+ 
+         Relationship REL_SUCCESS_RESP = new Relationship.Builder()
+                 .name("Response")
+                 .description("Response FlowFile will be routed upon success 
(2xx status codes).")
+                 .build();
+ 
+         Relationship REL_RETRY = new Relationship.Builder()
+                 .name("Retry")
+                 .description("FlowFile will be routed on any status code that 
can be retried (5xx status codes).")
+                 .build();
+ 
+         Relationship REL_NO_RETRY = new Relationship.Builder()
+                 .name("No Retry")
+                 .description("FlowFile will be routed on any status code that 
should NOT be retried (1xx, 3xx, 4xx status codes).")
+                 .build();
+ 
+         Relationship REL_FAILURE = new Relationship.Builder()
+                 .name("Failure")
+                 .description("FlowFile will be routed on any type of 
connection failure, timeout or general exception.")
+                 .build();
+ 
+         Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(
+                 REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, 
REL_FAILURE
+         )));
+ 
+     }
+ 
+     /**
+      * A single invocation of an HTTP request/response from the InvokeHTTP
+      * processor. This class encapsulates the entirety of the flowfile
+      * processing.
+      * <p>
+      * This class is not thread safe and is created new for every flowfile
+      * processed.
+      */
+     private static class Transaction implements Config {
+ 
+         /**
+          * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is
+          * used by the HTTP Date header and is optionally sent by the 
processor.
+          * This date is effectively an RFC 822/1123 date string, but HTTP
+          * requires it to be in GMT (preferring the literal 'GMT' string).
+          */
+         private static final String rfc1123 = "EEE, dd MMM yyyy HH:mm:ss 
'GMT'";
+         private static final DateTimeFormatter dateFormat = 
DateTimeFormat.forPattern(rfc1123).withLocale(Locale.US).withZoneUTC();
+ 
+         /**
+          * Every request/response cycle from this client has a unique
+          * transaction id which will be stored as a flowfile attribute. This
+          * generator is used to create the id.
+          */
+         private static final AtomicLong txIdGenerator = new AtomicLong();
+ 
+         private static final Charset utf8 = Charset.forName("UTF-8");
+ 
+         private final ProcessorLog logger;
+         private final AtomicReference<SSLContext> sslContextRef;
+         private final AtomicReference<Pattern> attributesToSendRef;
+         private final ProcessContext context;
+         private final ProcessSession session;
+ 
+         private final long txId = txIdGenerator.incrementAndGet();
+ 
+         private FlowFile request;
+         private FlowFile response;
+         private HttpURLConnection conn;
+ 
+         private int statusCode;
+         private String statusMessage;
+ 
+         public Transaction(
+                 ProcessorLog logger,
+                 AtomicReference<SSLContext> sslContextRef,
+                 AtomicReference<Pattern> attributesToSendRef,
+                 ProcessContext context,
+                 ProcessSession session,
+                 FlowFile request) {
+ 
+             this.logger = logger;
+             this.sslContextRef = sslContextRef;
+             this.attributesToSendRef = attributesToSendRef;
+             this.context = context;
+             this.session = session;
+             this.request = request;
+         }
+ 
+         public void process() {
+ 
+             try {
+                 openConnection();
+                 sendRequest();
+                 readResponse();
+                 transfer();
+ 
+             } catch (Throwable t) {
+                 // log exception
+                 logger.error("Routing to {} due to exception: {}", new 
Object[]{REL_FAILURE.getName(), t}, t);
+ 
+                 // penalize
+                 request = session.penalize(request);
+ 
+                 // transfer original to failure
+                 session.transfer(request, REL_FAILURE);
+ 
+                 // cleanup response flowfile, if applicable
+                 try {
+                     if (response != null) {
+                         session.remove(response);
+                     }
+                 } catch (Throwable t1) {
+                     logger.error("Could not cleanup response flowfile due to 
exception: {}", new Object[]{t1}, t1);
+                 }
+ 
+             }
+         }
+ 
+         private void openConnection() throws IOException {
+             // read the url property from the context
+             String urlstr = 
trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(request).getValue());
+             URL url = new URL(urlstr);
+ 
+             // create the connection
+             conn = (HttpURLConnection) url.openConnection();
+ 
+             // set the request method
+             String method = 
trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(request).getValue()).toUpperCase();
+             conn.setRequestMethod(method);
+ 
+             // set timeouts
+             
conn.setConnectTimeout(context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+             
conn.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+ 
+             // set whether to follow redirects
+             
conn.setInstanceFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean());
+ 
+             // special handling for https
+             if (conn instanceof HttpsURLConnection) {
+                 HttpsURLConnection sconn = (HttpsURLConnection) conn;
+ 
+                 // check if the ssl context is set
+                 SSLContext sslContext = sslContextRef.get();
+                 if (sslContext != null) {
+                     sconn.setSSLSocketFactory(sslContext.getSocketFactory());
+                 }
+ 
+                 // check the trusted hostname property and override the 
HostnameVerifier
+                 String trustedHostname = 
trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue());
+                 if (!trustedHostname.isEmpty()) {
+                     sconn.setHostnameVerifier(new 
OverrideHostnameVerifier(trustedHostname, sconn.getHostnameVerifier()));
+                 }
+             }
+ 
+         }
+ 
+         private void sendRequest() throws IOException {
+             // set the http request properties using flowfile attribute values
+             setRequestProperties();
+ 
+             // log request
+             logRequest();
+ 
+             // we only stream data for POST and PUT requests
+             String method = conn.getRequestMethod().toUpperCase();
+             if ("POST".equals(method) || "PUT".equals(method)) {
+                 conn.setDoOutput(true);
+                 conn.setFixedLengthStreamingMode(request.getSize());
+ 
+                 // write the flowfile contents to the output stream
+                 try (OutputStream os = new 
BufferedOutputStream(conn.getOutputStream())) {
+                     session.exportTo(request, os);
+                 }
+ 
+                 // emit provenance event
+                 session.getProvenanceReporter().send(request, 
conn.getURL().toExternalForm());
+             }
+ 
+         }
+ 
+         private void readResponse() throws IOException {
+ 
+             // output the raw response headers (DEBUG level only)
+             logResponse();
+ 
+             // store the status code and message
+             statusCode = conn.getResponseCode();
+             statusMessage = conn.getResponseMessage();
+ 
+             // always write the status attributes to the request flowfile
+             request = writeStatusAttributes(request);
+ 
+             // read from the appropriate input stream
+             try (InputStream is = getResponseStream()) {
+ 
+                 // if not successful, store the response body into a flowfile 
attribute
+                 if (!isSuccess()) {
+                     String body = trimToEmpty(toString(is, utf8));
+                     request = session.putAttribute(request, RESPONSE_BODY, 
body);
+                 }
+ 
+                 // if successful, store the response body as the flowfile 
payload
+                 // we include additional flowfile attributes including the 
reponse headers
+                 // and the status codes.
+                 if (isSuccess()) {
+                     // clone the flowfile to capture the response
+                     response = session.clone(request);
+ 
+                     // write the status attributes
+                     response = writeStatusAttributes(response);
+ 
+                     // write the response headers as attributes
+                     // this will overwrite any existing flowfile attributes
+                     response = session.putAllAttributes(response, 
convertAttributesFromHeaders());
+ 
+                     // transfer the message body to the payload
+                     // can potentially be null in edge cases
+                     if (is != null) {
+                         response = session.importFrom(is, response);
+                     }
+ 
+                     // invoke provenance events
+                     session.getProvenanceReporter().receive(response, 
conn.getURL().toExternalForm());
+ 
+                 }
+ 
+             }
+ 
+         }
+ 
+         private void transfer() throws IOException {
+             // check if we should penalize the request
+             if (!isSuccess()) {
+                 request = session.penalize(request);
+             }
+ 
+             // log the status codes from the response
+             logger.info("Request to {} returned status code {} for {}", new 
Object[]{
+                 conn.getURL().toExternalForm(),
+                 statusCode,
+                 request
+             });
+ 
+             // transfer to the correct relationship
+             // 2xx -> SUCCESS
+             if (isSuccess()) {
+                 // we have two flowfiles to transfer
+                 session.transfer(request, REL_SUCCESS_REQ);
+                 session.transfer(response, REL_SUCCESS_RESP);
+ 
+                 // 5xx -> RETRY
+             } else if (statusCode / 100 == 5) {
+                 session.transfer(request, REL_RETRY);
+ 
+                 // 1xx, 3xx, 4xx -> NO RETRY
+             } else {
+                 session.transfer(request, REL_NO_RETRY);
+             }
+ 
+         }
+ 
+         private void setRequestProperties() {
+ 
+             // check if we should send the a Date header with the request
+             if (context.getProperty(PROP_DATE_HEADER).asBoolean()) {
+                 conn.setRequestProperty("Date", getDateValue());
+             }
+ 
+             // iterate through the flowfile attributes, adding any attribute 
that
+             // matches the attributes-to-send pattern. if the pattern is not 
set
+             // (it's an optional property), ignore that attribute entirely
+             Pattern p = attributesToSendRef.get();
+             if (p != null) {
+                 Map<String, String> attributes = request.getAttributes();
+                 Matcher m = p.matcher("");
+                 for (Map.Entry<String, String> entry : attributes.entrySet()) 
{
+                     String key = trimToEmpty(entry.getKey());
+                     String val = trimToEmpty(entry.getValue());
+ 
+                     // don't include any of the ignored attributes
+                     if (IGNORED_ATTRIBUTES.contains(key)) {
+                         continue;
+                     }
+ 
+                     // check if our attribute key matches the pattern
+                     // if so, include in the request as a header
+                     m.reset(key);
+                     if (m.matches()) {
+                         conn.setRequestProperty(key, val);
+                     }
+                 }
+             }
+         }
+ 
+         /**
+          * Returns a Map of flowfile attributes from the response http 
headers.
+          * Multivalue headers are naively converted to comma separated 
strings.
+          */
+         private Map<String, String> convertAttributesFromHeaders() throws 
IOException {
+             // create a new hashmap to store the values from the connection
+             Map<String, String> map = new HashMap<>();
+             for (Map.Entry<String, List<String>> entry : 
conn.getHeaderFields().entrySet()) {
+                 String key = entry.getKey();
+                 if (key == null) {
+                     continue;
+                 }
+ 
+                 List<String> values = entry.getValue();
+ 
+                 // we ignore any headers with no actual values (rare)
+                 if (values == null || values.isEmpty()) {
+                     continue;
+                 }
+ 
+                 // create a comma separated string from the values, this is 
stored in the map
+                 String value = csv(values);
+ 
+                 // put the csv into the map
+                 map.put(key, value);
+             }
+ 
+             if (conn instanceof HttpsURLConnection) {
+                 HttpsURLConnection sconn = (HttpsURLConnection) conn;
+                 // this should seemingly not be required, but somehow the 
state of the jdk client is messed up
+                 // when retrieving SSL certificate related information if 
connect() has not been called previously.
+                 sconn.connect();
+                 map.put(REMOTE_DN, sconn.getPeerPrincipal().getName());
+             }
+ 
+             return map;
+         }
+ 
+         private boolean isSuccess() throws IOException {
+             if (statusCode == 0) {
+                 throw new IllegalStateException("Status code unknown, 
connection hasn't been attempted.");
+             }
+             return statusCode / 100 == 2;
+         }
+ 
+         private void logRequest() {
+             logger.debug("\nRequest to remote service:\n\t{}\n{}", new 
Object[]{
+                 conn.getURL().toExternalForm(),
+                 getLogString(conn.getRequestProperties())
+             });
+         }
+ 
+         private void logResponse() {
+             logger.debug("\nResponse from remote service:\n\t{}\n{}", new 
Object[]{
+                 conn.getURL().toExternalForm(),
+                 getLogString(conn.getHeaderFields())
+             });
+         }
+ 
+         private String getLogString(Map<String, List<String>> map) {
+             StringBuilder sb = new StringBuilder();
+             for (Map.Entry<String, List<String>> entry : map.entrySet()) {
+                 List<String> list = entry.getValue();
+                 if (list.isEmpty()) {
+                     continue;
+                 }
+                 sb.append("\t");
+                 sb.append(entry.getKey());
+                 sb.append(": ");
+                 if (list.size() == 1) {
+                     sb.append(list.get(0));
+                 } else {
+                     sb.append(list.toString());
+                 }
+                 sb.append("\n");
+             }
+             return sb.toString();
+         }
+ 
+         /**
+          * Convert a collection of string values into a overly simple comma
+          * separated string.
+          *
+          * Does not handle the case where the value contains the delimiter. 
i.e.
+          * if a value contains a comma, this method does nothing to try and
+          * escape or quote the value, in traditional csv style.
+          */
+         private String csv(Collection<String> values) {
+             if (values == null || values.isEmpty()) {
+                 return "";
+             }
+             if (values.size() == 1) {
+                 return values.iterator().next();
+             }
+ 
+             StringBuilder sb = new StringBuilder();
+             for (String value : values) {
+                 value = value.trim();
+                 if (value.isEmpty()) {
+                     continue;
+                 }
+                 if (sb.length() > 0) {
+                     sb.append(", ");
+                 }
+                 sb.append(value);
+             }
+             return sb.toString().trim();
+         }
+ 
+         /**
+          * Return the current datetime as an RFC 1123 formatted string in the
+          * GMT tz.
+          */
+         private String getDateValue() {
+             return dateFormat.print(System.currentTimeMillis());
+         }
+ 
+         /**
+          * Returns a string from the input stream using the specified 
character
+          * encoding.
+          */
+         private String toString(InputStream is, Charset charset) throws 
IOException {
+             if (is == null) {
+                 return "";
+             }
+ 
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             byte[] buf = new byte[4096];
+             int len;
+             while ((len = is.read(buf)) != -1) {
+                 out.write(buf, 0, len);
+             }
+             return new String(out.toByteArray(), charset);
+         }
+ 
+         /**
+          * Returns the input stream to use for reading from the remote server.
+          * We're either going to want the inputstream or errorstream,
+          * effectively depending on the status code.
+          * <p>
+          * This method can return null if there is no inputstream to read 
from.
+          * For example, if the remote server did not send a message body. eg.
+          * 204 No Content or 304 Not Modified
+          */
+         private InputStream getResponseStream() {
+             try {
+                 InputStream is = conn.getErrorStream();
+                 if (is == null) {
+                     is = conn.getInputStream();
+                 }
+                 return new BufferedInputStream(is);
+ 
+             } catch (IOException e) {
+                 logger.warn("Response stream threw an exception: {}", new 
Object[]{e}, e);
+                 return null;
+             }
+         }
+ 
+         /**
+          * Writes the status attributes onto the flowfile, returning the
+          * flowfile that was updated.
+          */
+         private FlowFile writeStatusAttributes(FlowFile flowfile) {
+             flowfile = session.putAttribute(flowfile, STATUS_CODE, 
String.valueOf(statusCode));
+             flowfile = session.putAttribute(flowfile, STATUS_MESSAGE, 
statusMessage);
+             flowfile = session.putAttribute(flowfile, REQUEST_URL, 
conn.getURL().toExternalForm());
+             flowfile = session.putAttribute(flowfile, TRANSACTION_ID, 
Long.toString(txId));
+             return flowfile;
+         }
+ 
+         /**
+          *
+          */
+         private static class OverrideHostnameVerifier implements 
HostnameVerifier {
+ 
+             private final String trustedHostname;
+             private final HostnameVerifier delegate;
+ 
+             private OverrideHostnameVerifier(String trustedHostname, 
HostnameVerifier delegate) {
+                 this.trustedHostname = trustedHostname;
+                 this.delegate = delegate;
+             }
+ 
+             @Override
+             public boolean verify(String hostname, SSLSession session) {
+                 if (trustedHostname.equalsIgnoreCase(hostname)) {
+                     return true;
+                 }
+                 return delegate.verify(hostname, session);
+             }
+         }
+     }
+ 
+ }

Reply via email to