http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java index 0000000,5fc7e5a..e39ab26 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java @@@ -1,0 -1,255 +1,255 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.SortedMap; + import java.util.TreeMap; + import java.util.concurrent.atomic.AtomicReference; + import java.util.regex.Matcher; + import java.util.regex.Pattern; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.util.StandardValidators; + + import org.apache.commons.codec.digest.DigestUtils; + import org.apache.commons.lang3.StringUtils; + + /** + * <p> + * This processor identifies groups of user-specified flowfile attributes and + * assigns a unique hash value to each group, recording this hash value in the + * flowfile's attributes using a user-specified attribute key. The groups are + * identified dynamically and preserved across application restarts. </p> + * + * <p> + * The user must supply optional processor properties during runtime to + * correctly configure this processor. The optional property key will be used as + * the flowfile attribute key for attribute inspection. The value must be a + * valid regular expression. This regular expression is evaluated against the + * flowfile attribute values. If the regular expression contains a capturing + * group, the value of that group will be used when comparing flow file + * attributes. Otherwise, the original flow file attribute's value will be used + * if and only if the value matches the given regular expression. </p> + * + * <p> + * If a flowfile does not have an attribute entry for one or more processor + * configured values, then the flowfile is routed to failure. </p> + * + * <p> + * An example hash value identification: + * + * Assume Processor Configured with Two Properties ("MDKey1" = ".*" and "MDKey2" + * = "(.).*"). + * + * FlowFile 1 has the following attributes: MDKey1 = a MDKey2 = b + * + * and will be assigned to group 1 (since no groups exist yet) + * + * FlowFile 2 has the following attributes: MDKey1 = 1 MDKey2 = 2 + * + * and will be assigned to group 2 (attribute keys do not match existing groups) + * + * FlowFile 3 has the following attributes: MDKey1 = a MDKey2 = z + * + * and will be assigned to group 3 (attribute keys do not match existing groups) + * + * FlowFile 4 has the following attribute: MDKey1 = a MDKey2 = bad + * + * and will be assigned to group 1 (because the value of MDKey1 has the regular + * expression ".*" applied to it, and that evaluates to the same as MDKey1 + * attribute of the first flow file. Similarly, the capturing group for the + * MDKey2 property indicates that only the first character of the MDKey2 + * attribute must match, and the first character of MDKey2 for Flow File 1 and + * Flow File 4 are both 'b'.) + * + * FlowFile 5 has the following attributes: MDKey1 = a + * + * and will route to failure because it does not have MDKey2 entry in its + * attribute + * </p> + * + * <p> + * The following flow file attributes are created or modified: <ul> + * <li><b><group.id.attribute.key></b> - The hash value.</li> </ul> </p> + */ + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"attributes", "hash"}) + @CapabilityDescription("Hashes together the key/value pairs of several FlowFile Attributes and adds the hash as a new attribute. " + + "Optional properties are to be added such that the name of the property is the name of a FlowFile Attribute to consider " + + "and the value of the property is a regular expression that, if matched by the attribute value, will cause that attribute " + + "to be used as part of the hash. If the regular expression contains a capturing group, only the value of the capturing " + + "group will be used.") + public class HashAttribute extends AbstractProcessor { + + public static final PropertyDescriptor HASH_VALUE_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Hash Value Attribute Key") + .description("The name of the FlowFile Attribute where the hash value should be stored") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Used for FlowFiles that have a hash value added").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Used for FlowFiles that are missing required attributes").build(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> properties; + private final AtomicReference<Map<String, Pattern>> regexMapRef = new AtomicReference<>(Collections.<String, Pattern>emptyMap()); + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_FAILURE); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HASH_VALUE_ATTRIBUTE); + this.properties = Collections.unmodifiableList(properties); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.createRegexValidator(0, 1, false)) + .required(false) + .dynamic(true) + .build(); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (descriptor.isRequired()) { + return; + } + + final Map<String, Pattern> patternMap = new HashMap<>(regexMapRef.get()); + if (newValue == null) { + patternMap.remove(descriptor.getName()); + } else { + if (newValue.equals(".*")) { + patternMap.put(descriptor.getName(), null); + } else { + final Pattern pattern = Pattern.compile(newValue); + patternMap.put(descriptor.getName(), pattern); + } + } + + regexMapRef.set(Collections.unmodifiableMap(patternMap)); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Map<String, Pattern> patterns = regexMapRef.get(); + final ProcessorLog logger = getLogger(); + + final SortedMap<String, String> attributes = getRelevantAttributes(flowFile, patterns); + if (attributes.size() != patterns.size()) { + final Set<String> wantedKeys = patterns.keySet(); + final Set<String> foundKeys = attributes.keySet(); + final StringBuilder missingKeys = new StringBuilder(); + for (final String wantedKey : wantedKeys) { + if (!foundKeys.contains(wantedKey)) { + missingKeys.append(wantedKey).append(" "); + } + } + + logger.error("routing {} to 'failure' because of missing attributes: {}", new Object[]{flowFile, missingKeys.toString()}); + session.transfer(flowFile, REL_FAILURE); + } else { + // create single string of attribute key/value pairs to use for group ID hash + final StringBuilder hashableValue = new StringBuilder(); + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + hashableValue.append(entry.getKey()); + if (StringUtils.isBlank(entry.getValue())) { + hashableValue.append("EMPTY"); + } else { + hashableValue.append(entry.getValue()); + } + } + + // create group ID + final String hashValue = DigestUtils.md5Hex(hashableValue.toString()); + + logger.info("adding Hash Value {} to attributes for {} and routing to success", new Object[]{hashValue, flowFile}); + flowFile = session.putAttribute(flowFile, context.getProperty(HASH_VALUE_ATTRIBUTE).getValue(), hashValue); + session.getProvenanceReporter().modifyAttributes(flowFile); + session.transfer(flowFile, REL_SUCCESS); + } + } + + private SortedMap<String, String> getRelevantAttributes(final FlowFile flowFile, final Map<String, Pattern> patterns) { + final SortedMap<String, String> attributeMap = new TreeMap<>(); + for (final Map.Entry<String, Pattern> entry : patterns.entrySet()) { + final String attributeName = entry.getKey(); + final String attributeValue = flowFile.getAttribute(attributeName); + if (attributeValue != null) { + final Pattern pattern = entry.getValue(); + if (pattern == null) { + attributeMap.put(attributeName, attributeValue); + } else { + final Matcher matcher = pattern.matcher(attributeValue); + if (matcher.matches()) { + if (matcher.groupCount() == 0) { + attributeMap.put(attributeName, matcher.group(0)); + } else { + attributeMap.put(attributeName, matcher.group(1)); + } + } + } + } + } + + return attributeMap; + } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java index 0000000,bb71589..827653b mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java @@@ -1,0 -1,151 +1,151 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.IOException; + import java.io.InputStream; + import java.security.DigestOutputStream; + import java.security.MessageDigest; + import java.security.NoSuchAlgorithmException; + import java.security.Security; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.stream.io.NullOutputStream; + import org.apache.nifi.stream.io.StreamUtils; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.util.ObjectHolder; + + @EventDriven + @SupportsBatching + @Tags({"hash", "content", "MD5", "SHA-1", "SHA-256"}) + @CapabilityDescription("Calculates a hash value for the Content of a FlowFile and puts that hash value on the FlowFile as an attribute whose name is determined by the <Hash Attribute Name> property") + public class HashContent extends AbstractProcessor { + + public static final PropertyDescriptor ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("Hash Attribute Name") + .description("The name of the FlowFile Attribute into which the Hash Value should be written. If the value already exists, it will be overwritten") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("hash.value") + .build(); + + public static final PropertyDescriptor HASH_ALGORITHM = new PropertyDescriptor.Builder() + .name("Hash Algorithm") + .description("Determines what hashing algorithm should be used to perform the hashing function") + .required(true) + .allowableValues(Security.getAlgorithms("MessageDigest")) + .defaultValue("MD5") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("FlowFiles that are process successfully will be sent to this relationship").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Any FlowFile that cannot be processed successfully will be sent to this relationship without any attribute being added").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + @Override + protected void init(ProcessorInitializationContext context) { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(ATTRIBUTE_NAME); + props.add(HASH_ALGORITHM); + properties = Collections.unmodifiableList(props); + + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(rels); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final String algorithm = context.getProperty(HASH_ALGORITHM).getValue(); + final MessageDigest digest; + try { + digest = MessageDigest.getInstance(algorithm); + } catch (NoSuchAlgorithmException e) { + logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final ObjectHolder<String> hashValueHolder = new ObjectHolder<>(null); + + try { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try (final DigestOutputStream digestOut = new DigestOutputStream(new NullOutputStream(), digest)) { + StreamUtils.copy(in, digestOut); + + final byte[] hash = digest.digest(); + final StringBuilder strb = new StringBuilder(hash.length * 2); + for (int i = 0; i < hash.length; i++) { + strb.append(Integer.toHexString((hash[i] & 0xFF) | 0x100).substring(1, 3)); + } + + hashValueHolder.set(strb.toString()); + } + } + }); + + final String attributeName = context.getProperty(ATTRIBUTE_NAME).getValue(); + flowFile = session.putAttribute(flowFile, attributeName, hashValueHolder.get()); + logger.info("Successfully added attribute '{}' to {} with a value of {}; routing to success", new Object[]{attributeName, flowFile, hashValueHolder.get()}); + session.getProvenanceReporter().modifyAttributes(flowFile); + session.transfer(flowFile, REL_SUCCESS); + } catch (final Exception e) { + logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java index 0000000,50bdba2..a52ac3b mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java @@@ -1,0 -1,474 +1,474 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.BufferedInputStream; + import java.io.IOException; + import java.io.InputStream; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + import java.util.zip.ZipInputStream; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.util.FlowFilePackagerV1; + import org.apache.nifi.util.FlowFilePackagerV3; + import org.apache.nifi.util.ObjectHolder; + + import org.apache.commons.compress.archivers.tar.TarArchiveEntry; + import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; + + /** + * <p> + * Attempts to detect the MIME Type of a FlowFile by examining its contents. If + * the MIME Type is determined, it is added to an attribute with the name + * mime.type + * </p> + * + * <p> + * The following MIME Types are supported: + * + * <ul> + * <li>application/gzip</li> + * <li>application/bzip2</li> + * <li>application/flowfile-v3</li> + * <li>application/flowfile-v1 (requires Identify TAR be set to true)</li> + * <li>application/xml</li> + * <li>video/mp4</li> + * <li>video/x-m4v</li> + * <li>video/mp4a-latm</li> + * <li>video/quicktime</li> + * <li>video/mpeg</li> + * <li>audio/wav</li> + * <li>audio/mp3</li> + * <li>image/bmp</li> + * <li>image/png</li> + * <li>image/jpg</li> + * <li>image/gif</li> + * <li>image/tif</li> + * <li>application/vnd.ms-works</li> + * <li>application/msexcel</li> + * <li>application/mspowerpoint</li> + * <li>application/msaccess</li> + * <li>application/x-ms-wmv</li> + * <li>application/pdf</li> + * <li>application/x-rpm</li> + * <li>application/tar</li> + * <li>application/x-7z-compressed</li> + * <li>application/java-archive</li> + * <li>application/zip</li> + * <li>application/x-lzh</li> + * </ul> + * </p> + */ + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"compression", "gzip", "bzip2", "zip", "MIME", "mime.type", "file", "identify"}) + @CapabilityDescription("Attempts to identify the MIME Type used for a FlowFile. If the MIME Type can be identified, " + + "an attribute with the name 'mime.type' is added with the value being the MIME Type. If the MIME Type cannot be determined, " + + "the value will be set to 'application/octet-stream'. Some MIME Types require reading a significant amount of data; for these MIME Types, their identification " + + "is optional. The algorithm may have to read the entire contents of the file for each type of identification.") + public class IdentifyMimeType extends AbstractProcessor { + + public static final PropertyDescriptor IDENTIFY_ZIP = new PropertyDescriptor.Builder() + .name("Identify ZIP") + .description("Determines whether or not to attempt in depth identification of ZIP MIME types") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + public static final PropertyDescriptor IDENTIFY_TAR = new PropertyDescriptor.Builder() + .name("Identify TAR") + .description("Determines whether or not to attempt in depth identification of TAR MIME types") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are routed to success").build(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> properties; + + private final List<MagicHeader> magicHeaders; + private final List<MagicHeader> zipMagicHeaders; + private final List<MagicHeader> tarMagicHeaders; + private final List<ContentScanningMimeTypeIdentifier> contentScanners; + private final int magicHeaderMaxLength; + + public IdentifyMimeType() { + // compile a list of Magic Header detectors + final List<MagicHeader> headers = new ArrayList<>(); + headers.add(new SimpleMagicHeader("application/gzip", new byte[]{0x1f, (byte) 0x8b})); + headers.add(new SimpleMagicHeader("application/bzip2", new byte[]{0x42, 0x5a})); + headers.add(new SimpleMagicHeader("application/flowfile-v3", FlowFilePackagerV3.MAGIC_HEADER)); + headers.add(new SimpleMagicHeader("application/xml", new byte[]{0x3c, 0x3f, 0x78, 0x6d, 0x6c, 0x20})); + headers.add(new SimpleMagicHeader("video/mp4", new byte[]{0, 0, 0, 0x14, 0x66, 0x74, 0x79, 0x70, 0x69, 0x73, 0x6F, 0x6D})); + headers.add(new SimpleMagicHeader("video/mp4", new byte[]{0, 0, 0, 0x14, 0x66, 0x74, 0x79, 0x70, 0x33, 0x67, 0x70, 0x35})); + headers.add(new SimpleMagicHeader("video/mp4", new byte[]{0, 0, 0, 0x14, 0x66, 0x74, 0x79, 0x70, 0X4d, 0X53, 0X4e, 0X56, 0X01, 0X29, 0, 0X46, 0X4d, 0X53, 0X4e, 0X56, 0X6d, 0X70, 0X34, 0X32})); + headers.add(new SimpleMagicHeader("video/x-m4v", new byte[]{0, 0, 0, 0x18, 0x66, 0x74, 0x79, 0x70, 0x6D, 0x70, 0x34, 0x32})); + headers.add(new SimpleMagicHeader("video/mp4a-latm", new byte[]{0, 0, 0, 0x18, 0x66, 0x74, 0x79, 0x70, 0x4D, 0x34, 0x41, 0x20})); + headers.add(new SimpleMagicHeader("video/quicktime", new byte[]{0, 0, 0, 0x14, 0x66, 0x74, 0x79, 0x70, 0x71, 0x74, 0x20, 0x20})); + headers.add(new SimpleMagicHeader("video/quicktime", new byte[]{0x6D, 0x6F, 0x6F, 0x76}, 4)); + headers.add(new SimpleMagicHeader("audio/mp3", new byte[]{0x49, 0x44, 0x33})); + headers.add(new SimpleMagicHeader("image/bmp", new byte[]{0x42, 0x4D})); + headers.add(new SimpleMagicHeader("image/png", new byte[]{(byte) 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A})); + headers.add(new SimpleMagicHeader("image/jpg", new byte[]{(byte) 0xFF, (byte) 0xD8, (byte) 0xFF})); + headers.add(new SimpleMagicHeader("image/gif", new byte[]{0x47, 0x49, 0x46, 0x38, 0x37, 0x61})); + headers.add(new SimpleMagicHeader("image/gif", new byte[]{0x47, 0x49, 0x46, 0x38, 0x39, 0x61})); + headers.add(new SimpleMagicHeader("image/tif", new byte[]{0x49, 0x20, 0x49})); + headers.add(new SimpleMagicHeader("image/tif", new byte[]{0x49, 0x49, 0x2A, 0x00})); + headers.add(new SimpleMagicHeader("image/tif", new byte[]{0x4D, 0x4D, 0x00, 0x2A})); + headers.add(new SimpleMagicHeader("image/tif", new byte[]{0x4D, 0x4D, 0x00, 0x2B})); + headers.add(new SimpleMagicHeader("application/vnd.ms-works", new byte[]{(byte) 0xFF, 0x00, 0x02, 0x00, 0x04, 0x04, 0x05, 0x54, 0x02, 0x00})); + headers.add(new SimpleMagicHeader("application/msexcel", new byte[]{0x09, 0x08, 0x10, 0, 0, 0x06, 0x05, 0}, 512)); + headers.add(new SimpleMagicHeader("application/mspowerpoint", new byte[]{0x00, 0x6E, 0x1E, (byte) 0xF0}, 512)); + headers.add(new SimpleMagicHeader("application/mspowerpoint", new byte[]{0x0F, 0x00, (byte) 0xE8, 0x03}, 512)); + headers.add(new SimpleMagicHeader("application/mspowerpoint", new byte[]{(byte) 0xA0, 0x46, 0x1D, (byte) 0xF0}, 512)); + headers.add(new CompoundMagicHeader("application/mspowerpoint", + new SimpleMagicHeader("", new byte[]{(byte) 0xFD, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF}, 512), + new SimpleMagicHeader("", new byte[]{0x00, 0x00, 0x00}, 517))); + headers.add(new SimpleMagicHeader("application/msaccess", new byte[]{0x00, 0x01, 0x00, 0x00, 0x53, 0x74, 0x61, 0x6E, 0x64, 0x61, 0x72, 0x64, 0x20, 0x41, 0x43, 0x45, 0x20, 0x44, 0x42})); + headers.add(new SimpleMagicHeader("application/msaccess", new byte[]{0x00, 0x01, 0x00, 0x00, 0x53, 0x74, 0x61, 0x6E, 0x64, 0x61, 0x72, 0x64, 0x20, 0x4A, 0x65, 0x74, 0x20, 0x44, 0x42})); + for (byte b : new byte[]{0x10, 0x1F, 0x22, 0x23, 0x28, 0x29}) { + headers.add(new SimpleMagicHeader("application/msaccess", new byte[]{(byte) 0xFD, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, b, 0x00}, 512)); + headers.add(new SimpleMagicHeader("application/msaccess", new byte[]{(byte) 0xFD, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, b, 0x02}, 512)); + } + headers.add(new SimpleMagicHeader("application/x-ms-wmv", new byte[]{0x30, 0x26, (byte) 0xB2, 0x75, (byte) 0x8E, 0x66, (byte) 0xCF, 0x11, (byte) 0xA6, (byte) 0xD9, 0x00, (byte) 0xAA, 0x00, 0x62, (byte) 0xCE, 0x6C})); + headers.add(new SimpleMagicHeader("application/pdf", new byte[]{0x25, 0x50, 0x44, 0x46})); + headers.add(new SimpleMagicHeader("application/x-rpm", new byte[]{(byte) 0xED, (byte) 0xAB, (byte) 0xEE, (byte) 0xDB})); + headers.add(new SimpleMagicHeader("application/x-7z-compressed", new byte[]{0x37, 0x7A, (byte) 0xBC, (byte) 0xAF, 0x27, 0x1C})); + headers.add(new SimpleMagicHeader("application/java-archive", new byte[]{0x4A, 0x41, 0x52, 0x43, 0x53, 0x00})); + headers.add(new SimpleMagicHeader("application/java-archive", new byte[]{0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x08})); + headers.add(new SimpleMagicHeader("application/java-archive", new byte[]{0x50, 0x4B, 0x03, 0x04, (byte) 0xA0, 0x00, 0x00})); + headers.add(new SimpleMagicHeader("application/x-lzh", new byte[]{0x2D, 0x6C, 0x68}, 2)); + headers.add(new CompoundMagicHeader("audio/wav", + new SimpleMagicHeader("", new byte[]{0x52, 0x49, 0x46, 0x46}), + new SimpleMagicHeader("", new byte[]{0x57, 0x41, 0x56, 0x45, 0x66, 0x6D, 0x74, 0x20}, 8))); + for (int nibble = 0xB0; nibble <= 0xBF; nibble++) { + headers.add(new SimpleMagicHeader("video/mpeg", new byte[]{0x00, 0x00, 0x01, (byte) nibble})); + } + this.magicHeaders = Collections.unmodifiableList(headers); + + // additional Magic Header detectors that will be turned off based on property settings + final List<MagicHeader> zipHeaders = new ArrayList<>(); + zipHeaders.add(new SimpleMagicHeader("application/zip", new byte[]{0x50, 0x4B, 0x03, 0x04})); + this.zipMagicHeaders = Collections.unmodifiableList(zipHeaders); + final List<MagicHeader> tarHeaders = new ArrayList<>(); + tarHeaders.add(new SimpleMagicHeader("application/tar", new byte[]{0x75, 0x73, 0x74, 0x61, 0x72}, 257)); + this.tarMagicHeaders = Collections.unmodifiableList(tarHeaders); + + // determine the max length that we need to buffer for magic headers + int max = 0; + for (final MagicHeader header : magicHeaders) { + max = Math.max(max, header.getRequiredBufferLength()); + } + for (final MagicHeader header : zipMagicHeaders) { + max = Math.max(max, header.getRequiredBufferLength()); + } + for (final MagicHeader header : tarMagicHeaders) { + max = Math.max(max, header.getRequiredBufferLength()); + } + this.magicHeaderMaxLength = max; + + // create list of Content Scanners + final List<ContentScanningMimeTypeIdentifier> scanningIdentifiers = new ArrayList<>(); + scanningIdentifiers.add(new ZipIdentifier()); + scanningIdentifiers.add(new TarIdentifier()); + this.contentScanners = Collections.unmodifiableList(scanningIdentifiers); + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(IDENTIFY_ZIP); + properties.add(IDENTIFY_TAR); + this.properties = Collections.unmodifiableList(properties); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final boolean identifyZip = context.getProperty(IDENTIFY_ZIP).asBoolean(); + final boolean identifyTar = context.getProperty(IDENTIFY_TAR).asBoolean(); + + final ObjectHolder<String> mimeTypeRef = new ObjectHolder<>(null); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream stream) throws IOException { + try (final InputStream in = new BufferedInputStream(stream)) { + // read in up to magicHeaderMaxLength bytes + in.mark(magicHeaderMaxLength); + byte[] header = new byte[magicHeaderMaxLength]; + for (int i = 0; i < header.length; i++) { + final int next = in.read(); + if (next >= 0) { + header[i] = (byte) next; + } else if (i == 0) { + header = new byte[0]; + } else { + final byte[] newBuffer = new byte[i - 1]; + System.arraycopy(header, 0, newBuffer, 0, i - 1); + header = newBuffer; + break; + } + } + in.reset(); + + for (final MagicHeader magicHeader : magicHeaders) { + if (magicHeader.matches(header)) { + mimeTypeRef.set(magicHeader.getMimeType()); + return; + } + } + + if (!identifyZip) { + for (final MagicHeader magicHeader : zipMagicHeaders) { + if (magicHeader.matches(header)) { + mimeTypeRef.set(magicHeader.getMimeType()); + return; + } + } + } + + if (!identifyTar) { + for (final MagicHeader magicHeader : tarMagicHeaders) { + if (magicHeader.matches(header)) { + mimeTypeRef.set(magicHeader.getMimeType()); + return; + } + } + } + } + } + }); + + String mimeType = mimeTypeRef.get(); + if (mimeType == null) { + for (final ContentScanningMimeTypeIdentifier scanningIdentifier : this.contentScanners) { + if (scanningIdentifier.isEnabled(context)) { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + String mimeType = scanningIdentifier.getMimeType(in); + if (mimeType != null) { + mimeTypeRef.set(mimeType); + } + } + }); + + if (mimeTypeRef.get() != null) { + break; + } + } + } + } + + mimeType = mimeTypeRef.get(); + if (mimeType == null) { + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/octet-stream"); + logger.info("Unable to identify MIME Type for {}; setting to application/octet-stream", new Object[]{flowFile}); + } else { + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeType); + logger.info("Identified {} as having MIME Type {}", new Object[]{flowFile, mimeType}); + } + + session.getProvenanceReporter().modifyAttributes(flowFile); + session.transfer(flowFile, REL_SUCCESS); + } + + private static interface ContentScanningMimeTypeIdentifier { + + boolean isEnabled(ProcessContext context); + + String getMimeType(InputStream in) throws IOException; + } + + private static class ZipIdentifier implements ContentScanningMimeTypeIdentifier { + + @Override + public String getMimeType(final InputStream in) throws IOException { + final ZipInputStream zipIn = new ZipInputStream(in); + try { + if (zipIn.getNextEntry() != null) { + return "application/zip"; + } + } catch (final Exception e) { + } + return null; + } + + @Override + public boolean isEnabled(final ProcessContext context) { + return context.getProperty(IDENTIFY_ZIP).asBoolean(); + } + } + + private static class TarIdentifier implements ContentScanningMimeTypeIdentifier { + + @Override + public String getMimeType(final InputStream in) throws IOException { + try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(in)) { + final TarArchiveEntry firstEntry = tarIn.getNextTarEntry(); + if (firstEntry != null) { + if (firstEntry.getName().equals(FlowFilePackagerV1.FILENAME_ATTRIBUTES)) { + final TarArchiveEntry secondEntry = tarIn.getNextTarEntry(); + if (secondEntry != null && secondEntry.getName().equals(FlowFilePackagerV1.FILENAME_CONTENT)) { + return "application/flowfile-v1"; + } + } + return "application/tar"; + } + } catch (final Exception e) { + } + return null; + } + + @Override + public boolean isEnabled(final ProcessContext context) { + return context.getProperty(IDENTIFY_TAR).asBoolean(); + } + } + + private static interface MagicHeader { + + int getRequiredBufferLength(); + + String getMimeType(); + + boolean matches(final byte[] header); + } + + private static class SimpleMagicHeader implements MagicHeader { + + private final String mimeType; + private final int offset; + private final byte[] byteSequence; + + public SimpleMagicHeader(final String mimeType, final byte[] byteSequence) { + this(mimeType, byteSequence, 0); + } + + public SimpleMagicHeader(final String mimeType, final byte[] byteSequence, final int offset) { + this.mimeType = mimeType; + this.byteSequence = byteSequence; + this.offset = offset; + } + + @Override + public int getRequiredBufferLength() { + return byteSequence.length + offset; + } + + @Override + public String getMimeType() { + return mimeType; + } + + @Override + public boolean matches(final byte[] header) { + if (header.length < getRequiredBufferLength()) { + return false; + } + + for (int i = 0; i < byteSequence.length; i++) { + if (byteSequence[i] != header[offset + i]) { + return false; + } + } + + return true; + } + } + + private static class CompoundMagicHeader implements MagicHeader { + + private final MagicHeader[] headers; + private final int requiredLength; + private final String mimeType; + + public CompoundMagicHeader(final String mimeType, final MagicHeader... headers) { + this.mimeType = mimeType; + this.headers = headers; + + int max = 0; + for (final MagicHeader header : headers) { + max = Math.max(max, header.getRequiredBufferLength()); + } + + this.requiredLength = max; + } + + @Override + public int getRequiredBufferLength() { + return requiredLength; + } + + @Override + public String getMimeType() { + return mimeType; + } + + @Override + public boolean matches(final byte[] header) { + for (final MagicHeader mh : headers) { + if (!mh.matches(header)) { + return false; + } + } + + return true; + } + + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index 0000000,e5c1ed8..41ecd91 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@@ -1,0 -1,758 +1,758 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import static org.apache.commons.lang3.StringUtils.*; + + import java.io.BufferedInputStream; + import java.io.BufferedOutputStream; + import java.io.ByteArrayOutputStream; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.net.HttpURLConnection; + import java.net.URL; + import java.nio.charset.Charset; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Locale; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.atomic.AtomicReference; + import java.util.regex.Matcher; + import java.util.regex.Pattern; + + import javax.net.ssl.HostnameVerifier; + import javax.net.ssl.HttpsURLConnection; + import javax.net.ssl.SSLContext; + import javax.net.ssl.SSLSession; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.ssl.SSLContextService; + import org.apache.nifi.ssl.SSLContextService.ClientAuth; + import org.apache.commons.lang3.StringUtils; + import org.joda.time.format.DateTimeFormat; + import org.joda.time.format.DateTimeFormatter; + + @SupportsBatching + @Tags({"http", "https", "rest", "client"}) + @CapabilityDescription("An HTTP client processor which converts FlowFile attributes to HTTP headers, with configurable HTTP method, url, etc.") + public final class InvokeHTTP extends AbstractProcessor { + + //-- properties --// + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + Set<String> contextIdentifiers = getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class); + + PropertyDescriptor contextServiceSelector = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(Config.PROP_SSL_CONTEXT_SERVICE) + .allowableValues(contextIdentifiers) + .build(); + + List<PropertyDescriptor> list = new ArrayList<>(Config.PROPERTIES); + list.add(2, contextServiceSelector); + + return Collections.unmodifiableList(list); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { + if (Config.PROP_TRUSTED_HOSTNAME.getName().equalsIgnoreCase(propertyDescriptorName)) { + return Config.PROP_TRUSTED_HOSTNAME; + } + return super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName); + } + + //-- relationships --// + @Override + public Set<Relationship> getRelationships() { + return Config.RELATIONSHIPS; + } + + //-- class properties --// + final AtomicReference<SSLContext> sslContextRef = new AtomicReference<>(); + final AtomicReference<Pattern> attributesToSendRef = new AtomicReference<>(); + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + newValue = StringUtils.trimToEmpty(newValue); + + // listen for the SSL Context Service property and retrieve the SSLContext from the controller service. + if (Config.PROP_SSL_CONTEXT_SERVICE.getName().equalsIgnoreCase(descriptor.getName())) { + if (newValue.isEmpty()) { + sslContextRef.set(null); + } else { + SSLContextService svc = (SSLContextService) getControllerServiceLookup().getControllerService(newValue); + sslContextRef.set(svc.createSSLContext(ClientAuth.NONE)); // ClientAuth is only useful for servers, not clients. + getLogger().info("Loading SSL configuration from keystore={} and truststore={}", new Object[]{svc.getKeyStoreFile(), svc.getTrustStoreFile()}); + } + } + + // compile the attributes-to-send filter pattern + if (Config.PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) { + if (newValue.isEmpty()) { + attributesToSendRef.set(null); + } else { + attributesToSendRef.set(Pattern.compile(newValue)); + } + } + + } + + //-- processing --// + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + List<FlowFile> flowfiles = session.get(Config.MAX_RESULTS_PER_THREAD); + if (flowfiles.isEmpty()) { + context.yield(); + return; + } + + for (FlowFile flowfile : flowfiles) { + Transaction transaction = new Transaction( + getLogger(), sslContextRef, attributesToSendRef, context, session, flowfile + ); + transaction.process(); + } + } + + /** + * + * Stores properties, relationships, configuration values, hard coded + * strings, magic numbers, etc. + * + * + */ + public interface Config { + + //-- magic numbers --// + int MAX_RESULTS_PER_THREAD = 50; + + //-- flowfile attribute keys returned after reading the response --// + String STATUS_CODE = "invokehttp.status.code"; + String STATUS_MESSAGE = "invokehttp.status.message"; + String RESPONSE_BODY = "invokehttp.response.body"; + String REQUEST_URL = "invokehttp.request.url"; + String TRANSACTION_ID = "invokehttp.tx.id"; + String REMOTE_DN = "invokehttp.remote.dn"; + + // Set of flowfile attributes which we generally always ignore during + // processing, including when converting http headers, copying attributes, etc. + // This set includes our strings defined above as well as some standard flowfile + // attributes. + Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN, + "uuid", "filename", "path" + ))); + + //-- properties --// + PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder() + .name("HTTP Method") + .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).") + .required(true) + .defaultValue("GET") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder() + .name("Remote URL") + .description("Remote URL which will be connected to, including scheme, host, port, path.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder() + .name("Connection Timeout") + .description("Max wait time for connection to remote service.") + .required(true) + .defaultValue("5 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder() + .name("Read Timeout") + .description("Max wait time for response from remote service.") + .required(true) + .defaultValue("15 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder() + .name("Include Date Header") + .description("Include an RFC-2616 Date header in the request.") + .required(true) + .defaultValue("True") + .allowableValues("True", "False") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder() + .name("Follow Redirects") + .description("Follow HTTP redirects issued by remote server.") + .required(true) + .defaultValue("True") + .allowableValues("True", "False") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder() + .name("Attributes to Send") + .description("Regular expression that defines which attributes to send as HTTP headers in the request. " + + "If not defined, no attributes are sent as headers.") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + + List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + PROP_METHOD, + PROP_URL, + PROP_CONNECT_TIMEOUT, + PROP_READ_TIMEOUT, + PROP_DATE_HEADER, + PROP_FOLLOW_REDIRECTS, + PROP_ATTRIBUTES_TO_SEND + )); + + // The allowableValues of the SSL Context Service property is dynamically populated at run time. + PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.") + .required(false) + .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class)) + .build(); + + // property to allow the hostname verifier to be overridden + // this is a "hidden" property - it's configured using a dynamic user property + PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder() + .name("Trusted Hostname") + .description("Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted " + + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .build(); + + //-- relationships --// + Relationship REL_SUCCESS_REQ = new Relationship.Builder() + .name("Original") + .description("Original FlowFile will be routed upon success (2xx status codes).") + .build(); + + Relationship REL_SUCCESS_RESP = new Relationship.Builder() + .name("Response") + .description("Response FlowFile will be routed upon success (2xx status codes).") + .build(); + + Relationship REL_RETRY = new Relationship.Builder() + .name("Retry") + .description("FlowFile will be routed on any status code that can be retried (5xx status codes).") + .build(); + + Relationship REL_NO_RETRY = new Relationship.Builder() + .name("No Retry") + .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).") + .build(); + + Relationship REL_FAILURE = new Relationship.Builder() + .name("Failure") + .description("FlowFile will be routed on any type of connection failure, timeout or general exception.") + .build(); + + Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE + ))); + + } + + /** + * A single invocation of an HTTP request/response from the InvokeHTTP + * processor. This class encapsulates the entirety of the flowfile + * processing. + * <p> + * This class is not thread safe and is created new for every flowfile + * processed. + */ + private static class Transaction implements Config { + + /** + * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is + * used by the HTTP Date header and is optionally sent by the processor. + * This date is effectively an RFC 822/1123 date string, but HTTP + * requires it to be in GMT (preferring the literal 'GMT' string). + */ + private static final String rfc1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'"; + private static final DateTimeFormatter dateFormat = DateTimeFormat.forPattern(rfc1123).withLocale(Locale.US).withZoneUTC(); + + /** + * Every request/response cycle from this client has a unique + * transaction id which will be stored as a flowfile attribute. This + * generator is used to create the id. + */ + private static final AtomicLong txIdGenerator = new AtomicLong(); + + private static final Charset utf8 = Charset.forName("UTF-8"); + + private final ProcessorLog logger; + private final AtomicReference<SSLContext> sslContextRef; + private final AtomicReference<Pattern> attributesToSendRef; + private final ProcessContext context; + private final ProcessSession session; + + private final long txId = txIdGenerator.incrementAndGet(); + + private FlowFile request; + private FlowFile response; + private HttpURLConnection conn; + + private int statusCode; + private String statusMessage; + + public Transaction( + ProcessorLog logger, + AtomicReference<SSLContext> sslContextRef, + AtomicReference<Pattern> attributesToSendRef, + ProcessContext context, + ProcessSession session, + FlowFile request) { + + this.logger = logger; + this.sslContextRef = sslContextRef; + this.attributesToSendRef = attributesToSendRef; + this.context = context; + this.session = session; + this.request = request; + } + + public void process() { + + try { + openConnection(); + sendRequest(); + readResponse(); + transfer(); + + } catch (Throwable t) { + // log exception + logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), t}, t); + + // penalize + request = session.penalize(request); + + // transfer original to failure + session.transfer(request, REL_FAILURE); + + // cleanup response flowfile, if applicable + try { + if (response != null) { + session.remove(response); + } + } catch (Throwable t1) { + logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{t1}, t1); + } + + } + } + + private void openConnection() throws IOException { + // read the url property from the context + String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(request).getValue()); + URL url = new URL(urlstr); + + // create the connection + conn = (HttpURLConnection) url.openConnection(); + + // set the request method + String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(request).getValue()).toUpperCase(); + conn.setRequestMethod(method); + + // set timeouts + conn.setConnectTimeout(context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + conn.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + + // set whether to follow redirects + conn.setInstanceFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean()); + + // special handling for https + if (conn instanceof HttpsURLConnection) { + HttpsURLConnection sconn = (HttpsURLConnection) conn; + + // check if the ssl context is set + SSLContext sslContext = sslContextRef.get(); + if (sslContext != null) { + sconn.setSSLSocketFactory(sslContext.getSocketFactory()); + } + + // check the trusted hostname property and override the HostnameVerifier + String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue()); + if (!trustedHostname.isEmpty()) { + sconn.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, sconn.getHostnameVerifier())); + } + } + + } + + private void sendRequest() throws IOException { + // set the http request properties using flowfile attribute values + setRequestProperties(); + + // log request + logRequest(); + + // we only stream data for POST and PUT requests + String method = conn.getRequestMethod().toUpperCase(); + if ("POST".equals(method) || "PUT".equals(method)) { + conn.setDoOutput(true); + conn.setFixedLengthStreamingMode(request.getSize()); + + // write the flowfile contents to the output stream + try (OutputStream os = new BufferedOutputStream(conn.getOutputStream())) { + session.exportTo(request, os); + } + + // emit provenance event + session.getProvenanceReporter().send(request, conn.getURL().toExternalForm()); + } + + } + + private void readResponse() throws IOException { + + // output the raw response headers (DEBUG level only) + logResponse(); + + // store the status code and message + statusCode = conn.getResponseCode(); + statusMessage = conn.getResponseMessage(); + + // always write the status attributes to the request flowfile + request = writeStatusAttributes(request); + + // read from the appropriate input stream + try (InputStream is = getResponseStream()) { + + // if not successful, store the response body into a flowfile attribute + if (!isSuccess()) { + String body = trimToEmpty(toString(is, utf8)); + request = session.putAttribute(request, RESPONSE_BODY, body); + } + + // if successful, store the response body as the flowfile payload + // we include additional flowfile attributes including the reponse headers + // and the status codes. + if (isSuccess()) { + // clone the flowfile to capture the response + response = session.clone(request); + + // write the status attributes + response = writeStatusAttributes(response); + + // write the response headers as attributes + // this will overwrite any existing flowfile attributes + response = session.putAllAttributes(response, convertAttributesFromHeaders()); + + // transfer the message body to the payload + // can potentially be null in edge cases + if (is != null) { + response = session.importFrom(is, response); + } + + // invoke provenance events + session.getProvenanceReporter().receive(response, conn.getURL().toExternalForm()); + + } + + } + + } + + private void transfer() throws IOException { + // check if we should penalize the request + if (!isSuccess()) { + request = session.penalize(request); + } + + // log the status codes from the response + logger.info("Request to {} returned status code {} for {}", new Object[]{ + conn.getURL().toExternalForm(), + statusCode, + request + }); + + // transfer to the correct relationship + // 2xx -> SUCCESS + if (isSuccess()) { + // we have two flowfiles to transfer + session.transfer(request, REL_SUCCESS_REQ); + session.transfer(response, REL_SUCCESS_RESP); + + // 5xx -> RETRY + } else if (statusCode / 100 == 5) { + session.transfer(request, REL_RETRY); + + // 1xx, 3xx, 4xx -> NO RETRY + } else { + session.transfer(request, REL_NO_RETRY); + } + + } + + private void setRequestProperties() { + + // check if we should send the a Date header with the request + if (context.getProperty(PROP_DATE_HEADER).asBoolean()) { + conn.setRequestProperty("Date", getDateValue()); + } + + // iterate through the flowfile attributes, adding any attribute that + // matches the attributes-to-send pattern. if the pattern is not set + // (it's an optional property), ignore that attribute entirely + Pattern p = attributesToSendRef.get(); + if (p != null) { + Map<String, String> attributes = request.getAttributes(); + Matcher m = p.matcher(""); + for (Map.Entry<String, String> entry : attributes.entrySet()) { + String key = trimToEmpty(entry.getKey()); + String val = trimToEmpty(entry.getValue()); + + // don't include any of the ignored attributes + if (IGNORED_ATTRIBUTES.contains(key)) { + continue; + } + + // check if our attribute key matches the pattern + // if so, include in the request as a header + m.reset(key); + if (m.matches()) { + conn.setRequestProperty(key, val); + } + } + } + } + + /** + * Returns a Map of flowfile attributes from the response http headers. + * Multivalue headers are naively converted to comma separated strings. + */ + private Map<String, String> convertAttributesFromHeaders() throws IOException { + // create a new hashmap to store the values from the connection + Map<String, String> map = new HashMap<>(); + for (Map.Entry<String, List<String>> entry : conn.getHeaderFields().entrySet()) { + String key = entry.getKey(); + if (key == null) { + continue; + } + + List<String> values = entry.getValue(); + + // we ignore any headers with no actual values (rare) + if (values == null || values.isEmpty()) { + continue; + } + + // create a comma separated string from the values, this is stored in the map + String value = csv(values); + + // put the csv into the map + map.put(key, value); + } + + if (conn instanceof HttpsURLConnection) { + HttpsURLConnection sconn = (HttpsURLConnection) conn; + // this should seemingly not be required, but somehow the state of the jdk client is messed up + // when retrieving SSL certificate related information if connect() has not been called previously. + sconn.connect(); + map.put(REMOTE_DN, sconn.getPeerPrincipal().getName()); + } + + return map; + } + + private boolean isSuccess() throws IOException { + if (statusCode == 0) { + throw new IllegalStateException("Status code unknown, connection hasn't been attempted."); + } + return statusCode / 100 == 2; + } + + private void logRequest() { + logger.debug("\nRequest to remote service:\n\t{}\n{}", new Object[]{ + conn.getURL().toExternalForm(), + getLogString(conn.getRequestProperties()) + }); + } + + private void logResponse() { + logger.debug("\nResponse from remote service:\n\t{}\n{}", new Object[]{ + conn.getURL().toExternalForm(), + getLogString(conn.getHeaderFields()) + }); + } + + private String getLogString(Map<String, List<String>> map) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, List<String>> entry : map.entrySet()) { + List<String> list = entry.getValue(); + if (list.isEmpty()) { + continue; + } + sb.append("\t"); + sb.append(entry.getKey()); + sb.append(": "); + if (list.size() == 1) { + sb.append(list.get(0)); + } else { + sb.append(list.toString()); + } + sb.append("\n"); + } + return sb.toString(); + } + + /** + * Convert a collection of string values into a overly simple comma + * separated string. + * + * Does not handle the case where the value contains the delimiter. i.e. + * if a value contains a comma, this method does nothing to try and + * escape or quote the value, in traditional csv style. + */ + private String csv(Collection<String> values) { + if (values == null || values.isEmpty()) { + return ""; + } + if (values.size() == 1) { + return values.iterator().next(); + } + + StringBuilder sb = new StringBuilder(); + for (String value : values) { + value = value.trim(); + if (value.isEmpty()) { + continue; + } + if (sb.length() > 0) { + sb.append(", "); + } + sb.append(value); + } + return sb.toString().trim(); + } + + /** + * Return the current datetime as an RFC 1123 formatted string in the + * GMT tz. + */ + private String getDateValue() { + return dateFormat.print(System.currentTimeMillis()); + } + + /** + * Returns a string from the input stream using the specified character + * encoding. + */ + private String toString(InputStream is, Charset charset) throws IOException { + if (is == null) { + return ""; + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buf = new byte[4096]; + int len; + while ((len = is.read(buf)) != -1) { + out.write(buf, 0, len); + } + return new String(out.toByteArray(), charset); + } + + /** + * Returns the input stream to use for reading from the remote server. + * We're either going to want the inputstream or errorstream, + * effectively depending on the status code. + * <p> + * This method can return null if there is no inputstream to read from. + * For example, if the remote server did not send a message body. eg. + * 204 No Content or 304 Not Modified + */ + private InputStream getResponseStream() { + try { + InputStream is = conn.getErrorStream(); + if (is == null) { + is = conn.getInputStream(); + } + return new BufferedInputStream(is); + + } catch (IOException e) { + logger.warn("Response stream threw an exception: {}", new Object[]{e}, e); + return null; + } + } + + /** + * Writes the status attributes onto the flowfile, returning the + * flowfile that was updated. + */ + private FlowFile writeStatusAttributes(FlowFile flowfile) { + flowfile = session.putAttribute(flowfile, STATUS_CODE, String.valueOf(statusCode)); + flowfile = session.putAttribute(flowfile, STATUS_MESSAGE, statusMessage); + flowfile = session.putAttribute(flowfile, REQUEST_URL, conn.getURL().toExternalForm()); + flowfile = session.putAttribute(flowfile, TRANSACTION_ID, Long.toString(txId)); + return flowfile; + } + + /** + * + */ + private static class OverrideHostnameVerifier implements HostnameVerifier { + + private final String trustedHostname; + private final HostnameVerifier delegate; + + private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) { + this.trustedHostname = trustedHostname; + this.delegate = delegate; + } + + @Override + public boolean verify(String hostname, SSLSession session) { + if (trustedHostname.equalsIgnoreCase(hostname)) { + return true; + } + return delegate.verify(hostname, session); + } + } + } + + }
