http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java index 0000000,8520a55..2a81b64 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java @@@ -1,0 -1,370 +1,370 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.stream.io.BufferedOutputStream; + import org.apache.nifi.stream.io.ByteArrayOutputStream; + import org.apache.nifi.stream.io.ByteCountingInputStream; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.util.IntegerHolder; + import org.apache.nifi.util.ObjectHolder; + + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.TimeUnit; + import java.util.UUID; + + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"split", "text"}) + @CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines") + public class SplitText extends AbstractProcessor { + + // attribute keys + public static final String SPLIT_LINE_COUNT = "text.line.count"; + public static final String FRAGMENT_ID = "fragment.identifier"; + public static final String FRAGMENT_INDEX = "fragment.index"; + public static final String FRAGMENT_COUNT = "fragment.count"; + public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename"; + + public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder() + .name("Line Split Count") + .description("The number of lines that will be added to each split file") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor HEADER_LINE_COUNT = new PropertyDescriptor.Builder() + .name("Header Line Count") + .description("The number of lines that should be considered part of the header; the header lines will be duplicated to all split files") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("0") + .build(); + public static final PropertyDescriptor REMOVE_TRAILING_NEWLINES = new PropertyDescriptor.Builder() + .name("Remove Trailing Newlines") + .description( + "Whether to remove newlines at the end of each split file. This should be false if you intend to merge the split files later") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original input file will be routed to this destination when it has been successfully split into 1 or more files").build(); + public static final Relationship REL_SPLITS = new Relationship.Builder().name("splits").description("The split files will be routed to this destination when an input file is successfully split into 1 or more split files").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(LINE_SPLIT_COUNT); + properties.add(HEADER_LINE_COUNT); + properties.add(REMOVE_TRAILING_NEWLINES); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_ORIGINAL); + relationships.add(REL_SPLITS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + /** + * Reads up to the given maximum number of lines, copying them to out + * + * @param in + * @param maxNumLines + * @param out + * @return the number of lines actually copied + * @throws IOException + */ + private int readLines(final InputStream in, final int maxNumLines, final OutputStream out, final boolean keepAllNewLines) throws IOException { + int numLines = 0; + for (int i = 0; i < maxNumLines; i++) { + final long bytes = countBytesToSplitPoint(in, out, keepAllNewLines || (i != maxNumLines - 1)); + if (bytes <= 0) { + return numLines; + } + + numLines++; + } + + return numLines; + } + + private long countBytesToSplitPoint(final InputStream in, final OutputStream out, final boolean includeLineDelimiter) throws IOException { + int lastByte = -1; + long bytesRead = 0L; + + while (true) { + in.mark(1); + final int nextByte = in.read(); + + // if we hit end of stream or new line we're done + if (nextByte == -1) { + if (lastByte == '\r') { + return includeLineDelimiter ? bytesRead : bytesRead - 1; + } else { + return bytesRead; + } + } + + // if there's an OutputStream to copy the data to, copy it, if appropriate. + // "if appropriate" means that it's not a line delimiter or that we want to copy line delimiters + bytesRead++; + if (out != null && (includeLineDelimiter || (nextByte != '\n' && nextByte != '\r'))) { + out.write(nextByte); + } + + // if we have a new line, then we're done + if (nextByte == '\n') { + if (includeLineDelimiter) { + return bytesRead; + } else { + return (lastByte == '\r') ? bytesRead - 2 : bytesRead - 1; + } + } + + // we didn't get a new line but if last byte was carriage return we've reached a new-line. + // so we roll back the last byte that we read and return + if (lastByte == '\r') { + in.reset(); + bytesRead--; // we reset the stream by 1 byte so decrement the number of bytes read by 1 + return includeLineDelimiter ? bytesRead : bytesRead - 1; + } + + // keep track of what the last byte was that we read so that we can detect \r followed by some other + // character. + lastByte = nextByte; + } + } + + private SplitInfo countBytesToSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines) throws IOException { + SplitInfo info = new SplitInfo(); + + while (info.lengthLines < numLines) { + final long bytesTillNext = countBytesToSplitPoint(in, null, keepAllNewLines || (info.lengthLines != numLines - 1)); + if (bytesTillNext <= 0L) { + break; + } + + info.lengthLines++; + info.lengthBytes += bytesTillNext; + } + + return info; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger(); + final int splitCount = context.getProperty(LINE_SPLIT_COUNT).asInteger(); + final boolean removeTrailingNewlines = context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean(); + + final ObjectHolder<String> errorMessage = new ObjectHolder<>(null); + final ArrayList<SplitInfo> splitInfos = new ArrayList<>(); + + final long startNanos = System.nanoTime(); + final List<FlowFile> splits = new ArrayList<>(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn); + final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { + + // if we have header lines, copy them into a ByteArrayOutputStream + final ByteArrayOutputStream headerStream = new ByteArrayOutputStream(); + final int headerLinesCopied = readLines(in, headerCount, headerStream, true); + if (headerLinesCopied < headerCount) { + errorMessage.set("Header Line Count is set to " + headerCount + " but file had only " + headerLinesCopied + " lines"); + return; + } + + while (true) { + if (headerCount > 0) { + // if we have header lines, create a new FlowFile, copy the header lines to that file, + // and then start copying lines + final IntegerHolder linesCopied = new IntegerHolder(0); + FlowFile splitFile = session.create(flowFile); + try { + splitFile = session.write(splitFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) { + headerStream.writeTo(out); + linesCopied.set(readLines(in, splitCount, out, !removeTrailingNewlines)); + } + } + }); + splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get())); + logger.debug("Created Split File {} with {} lines", new Object[]{splitFile, linesCopied.get()}); + } finally { + if (linesCopied.get() > 0) { + splits.add(splitFile); + } else { + // if the number of content lines is a multiple of the SPLIT_LINE_COUNT, + // the last flow file will contain just a header; don't forward that one + session.remove(splitFile); + } + } + + // If we copied fewer lines than what we want, then we're done copying data (we've hit EOF). + if (linesCopied.get() < splitCount) { + break; + } + } else { + // We have no header lines, so we can simply demarcate the original File via the + // ProcessSession#clone method. + long beforeReadingLines = in.getBytesConsumed(); + final SplitInfo info = countBytesToSplitPoint(in, splitCount, !removeTrailingNewlines); + if (info.lengthBytes == 0) { + // stream is out of data + break; + } else { + info.offsetBytes = beforeReadingLines; + splitInfos.add(info); + final long procNanos = System.nanoTime() - startNanos; + final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS); + logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; total splits = {}; total processing time = {} ms", new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis}); + } + } + } + } + } + }); + + if (errorMessage.get() != null) { + logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()}); + session.transfer(flowFile, REL_FAILURE); + if (splits != null && !splits.isEmpty()) { + session.remove(splits); + } + return; + } + + if (!splitInfos.isEmpty()) { + // Create the splits + for (final SplitInfo info : splitInfos) { + FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes); + split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines)); + splits.add(split); + } + } + finishFragmentAttributes(session, flowFile, splits); + + if (splits.size() > 10) { + logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()}); + } else { + logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits}); + } + + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(splits, REL_SPLITS); + } + + /** + * Apply split index, count and other attributes. + * + * @param session + * @param source + * @param unpacked + */ + private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) { + final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key()); + + final String fragmentId = UUID.randomUUID().toString(); + final ArrayList<FlowFile> newList = new ArrayList<>(splits); + splits.clear(); + for (int i = 1; i <= newList.size(); i++) { + FlowFile ff = newList.get(i - 1); + final Map<String, String> attributes = new HashMap<>(); + attributes.put(FRAGMENT_ID, fragmentId); + attributes.put(FRAGMENT_INDEX, String.valueOf(i)); + attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size())); + attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename); + FlowFile newFF = session.putAllAttributes(ff, attributes); + splits.add(newFF); + } + } + + private class SplitInfo { + + public long offsetBytes; + public long lengthBytes; + public long lengthLines; + + public SplitInfo() { + super(); + this.offsetBytes = 0L; + this.lengthBytes = 0L; + this.lengthLines = 0L; + } + + @SuppressWarnings("unused") + public SplitInfo(long offsetBytes, long lengthBytes, long lengthLines) { + super(); + this.offsetBytes = offsetBytes; + this.lengthBytes = lengthBytes; + this.lengthLines = lengthLines; + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java index 0000000,c5eda3d..1220b7c mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java @@@ -1,0 -1,300 +1,300 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + + import javax.xml.parsers.ParserConfigurationException; + import javax.xml.parsers.SAXParser; + import javax.xml.parsers.SAXParserFactory; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.processors.standard.util.XmlElementNotifier; + import org.apache.nifi.util.BooleanHolder; + + import org.apache.commons.lang3.StringEscapeUtils; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.xml.sax.Attributes; + import org.xml.sax.ContentHandler; + import org.xml.sax.InputSource; + import org.xml.sax.Locator; + import org.xml.sax.SAXException; + import org.xml.sax.XMLReader; + + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"xml", "split"}) + @CapabilityDescription("Splits an XML File into multiple separate FlowFiles, each comprising a child or descendant of the original root element") + public class SplitXml extends AbstractProcessor { + + public static final PropertyDescriptor SPLIT_DEPTH = new PropertyDescriptor.Builder() + .name("Split Depth") + .description("Indicates the XML-nesting depth to start splitting XML fragments. A depth of 1 means split the root's children, whereas a depth of 2 means split the root's children's children and so forth.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build(); + public static final Relationship REL_SPLIT = new Relationship.Builder().name("split").description("All segments of the original FlowFile will be routed to this relationship").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid XML), it will be routed to this relationship").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + private static final String FEATURE_PREFIX = "http://xml.org/sax/features/"; + public static final String ENABLE_NAMESPACES_FEATURE = FEATURE_PREFIX + "namespaces"; + public static final String ENABLE_NAMESPACE_PREFIXES_FEATURE = FEATURE_PREFIX + "namespace-prefixes"; + private static final SAXParserFactory saxParserFactory = SAXParserFactory.newInstance(); + + static { + saxParserFactory.setNamespaceAware(true); + try { + saxParserFactory.setFeature(ENABLE_NAMESPACES_FEATURE, true); + saxParserFactory.setFeature(ENABLE_NAMESPACE_PREFIXES_FEATURE, true); + } catch (Exception e) { + final Logger staticLogger = LoggerFactory.getLogger(SplitXml.class); + staticLogger.warn("Unable to configure SAX Parser to make namespaces available", e); + } + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SPLIT_DEPTH); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_ORIGINAL); + relationships.add(REL_SPLIT); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final int depth = context.getProperty(SPLIT_DEPTH).asInteger(); + final ProcessorLog logger = getLogger(); + + final List<FlowFile> splits = new ArrayList<>(); + final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(new XmlElementNotifier() { + @Override + public void onXmlElementFound(final String xmlTree) { + FlowFile split = session.create(original); + split = session.write(split, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(xmlTree.getBytes("UTF-8")); + } + }); + splits.add(split); + } + }, depth); + + final BooleanHolder failed = new BooleanHolder(false); + session.read(original, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + SAXParser saxParser = null; + try { + saxParser = saxParserFactory.newSAXParser(); + final XMLReader reader = saxParser.getXMLReader(); + reader.setContentHandler(parser); + reader.parse(new InputSource(in)); + } catch (final ParserConfigurationException | SAXException e) { + logger.error("Unable to parse {} due to {}", new Object[]{original, e}); + failed.set(true); + } + } + } + }); + + if (failed.get()) { + session.transfer(original, REL_FAILURE); + session.remove(splits); + } else { + session.transfer(splits, REL_SPLIT); + session.transfer(original, REL_ORIGINAL); + logger.info("Split {} into {} FlowFiles", new Object[]{original, splits.size()}); + } + } + + private static class XmlSplitterSaxParser implements ContentHandler { + + private static final String XML_PROLOGUE = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"; + private final XmlElementNotifier notifier; + private final int splitDepth; + private final StringBuilder sb = new StringBuilder(XML_PROLOGUE); + private int depth = 0; + + public XmlSplitterSaxParser(XmlElementNotifier notifier, int splitDepth) { + this.notifier = notifier; + this.splitDepth = splitDepth; + } + + @Override + public void characters(char[] ch, int start, int length) throws SAXException { + // if we're not at a level where we care about capturing text, then return + if (depth <= splitDepth) { + return; + } + + // capture text + for (int i = start; i < start + length; i++) { + char c = ch[i]; + switch (c) { + case '<': + sb.append("<"); + break; + case '>': + sb.append(">"); + break; + case '&': + sb.append("&"); + break; + case '\'': + sb.append("'"); + break; + case '"': + sb.append("""); + break; + default: + sb.append(c); + break; + } + } + } + + @Override + public void endDocument() throws SAXException { + } + + @Override + public void endElement(String uri, String localName, String qName) throws SAXException { + // We have finished processing this element. Decrement the depth. + int newDepth = --depth; + + // if we're at a level where we care about capturing text, then add the closing element + if (newDepth >= splitDepth) { + // Add the element end tag. + sb.append("</").append(qName).append(">"); + } + + // If we have now returned to level 1, we have finished processing + // a 2nd-level element. Send notification with the XML text and + // erase the String Builder so that we can start + // processing a new 2nd-level element. + if (newDepth == splitDepth) { + String elementTree = sb.toString(); + notifier.onXmlElementFound(elementTree); + // Reset the StringBuilder to just the XML prolog. + sb.setLength(XML_PROLOGUE.length()); + } + } + + @Override + public void endPrefixMapping(String prefix) throws SAXException { + } + + @Override + public void ignorableWhitespace(char[] ch, int start, int length) throws SAXException { + } + + @Override + public void processingInstruction(String target, String data) throws SAXException { + } + + @Override + public void setDocumentLocator(Locator locator) { + } + + @Override + public void skippedEntity(String name) throws SAXException { + } + + @Override + public void startDocument() throws SAXException { + } + + @Override + public void startElement(final String uri, final String localName, final String qName, final Attributes atts) throws SAXException { + // Increment the current depth because start a new XML element. + int newDepth = ++depth; + // Output the element and its attributes if it is + // not the root element. + if (newDepth > splitDepth) { + sb.append("<"); + sb.append(qName); + + int attCount = atts.getLength(); + for (int i = 0; i < attCount; i++) { + String attName = atts.getQName(i); + String attValue = StringEscapeUtils.escapeXml10(atts.getValue(i)); + sb.append(" ").append(attName).append("=").append("\"").append(attValue).append("\""); + } + + sb.append(">"); + } + } + + @Override + public void startPrefixMapping(String prefix, String uri) throws SAXException { + } + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java index 0000000,7385918..5e251f6 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java @@@ -1,0 -1,194 +1,194 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.File; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.TimeUnit; + + import javax.xml.transform.Transformer; + import javax.xml.transform.TransformerFactory; + import javax.xml.transform.stream.StreamResult; + import javax.xml.transform.stream.StreamSource; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.components.Validator; + import org.apache.nifi.expression.AttributeExpression; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.io.StreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.util.StopWatch; + import org.apache.nifi.util.Tuple; + + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"xml", "xslt", "transform"}) + @CapabilityDescription("Applies the provided XSLT file to the flowfile XML payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the XSL transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") + public class TransformXml extends AbstractProcessor { + + public static final PropertyDescriptor XSLT_FILE_NAME = new PropertyDescriptor.Builder() + .name("XSLT file name") + .description("Provides the name (including full path) of the XSLT file to apply to the flowfile XML content.") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("The FlowFile with transformed content will be routed to this relationship").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid XML), it will be routed to this relationship").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(XSLT_FILE_NAME); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .required(false) + .dynamic(true) + .build(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + + File stylesheet = new File(context.getProperty(XSLT_FILE_NAME).getValue()); + StreamSource styleSource = new StreamSource(stylesheet); + TransformerFactory tfactory = new net.sf.saxon.TransformerFactoryImpl(); + Transformer transformer = tfactory.newTransformer(styleSource); + + // pass all dynamic properties to the transformer + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { + String value = context.newPropertyValue(entry.getValue()).evaluateAttributeExpressions(original).getValue(); + transformer.setParameter(entry.getKey().getName(), value); + } + } + + // use a StreamSource with Saxon + StreamSource source = new StreamSource(in); + StreamResult result = new StreamResult(out); + transformer.transform(source, result); + } catch (final Exception e) { + throw new IOException(e); + } + } + }); + session.transfer(transformed, REL_SUCCESS); + session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + logger.info("Transformed {}", new Object[]{original}); + } catch (Exception e) { + logger.error("Unable to transform {} due to {}", new Object[]{original, e}); + session.transfer(original, REL_FAILURE); + } + } + + @SuppressWarnings("unused") + private static final class XsltValidator implements Validator { + + private volatile Tuple<String, ValidationResult> cachedResult; + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) { + final Tuple<String, ValidationResult> lastResult = this.cachedResult; + if (lastResult != null && lastResult.getKey().equals(input)) { + return lastResult.getValue(); + } else { + String error = null; + final File stylesheet = new File(input); + final TransformerFactory tfactory = new net.sf.saxon.TransformerFactoryImpl(); + final StreamSource styleSource = new StreamSource(stylesheet); + + try { + tfactory.newTransformer(styleSource); + } catch (final Exception e) { + error = e.toString(); + } + + this.cachedResult = new Tuple<>(input, + new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(error == null) + .explanation(error).build()); + return this.cachedResult.getValue(); + } + } + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 0000000,dc6daea..b30e780 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@@ -1,0 -1,427 +1,427 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.File; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.nio.file.Path; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.UUID; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.stream.io.BufferedOutputStream; + import org.apache.nifi.stream.io.StreamUtils; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.util.FlowFileUnpackager; + import org.apache.nifi.util.FlowFileUnpackagerV1; + import org.apache.nifi.util.FlowFileUnpackagerV2; + import org.apache.nifi.util.FlowFileUnpackagerV3; + import org.apache.nifi.util.ObjectHolder; + + import org.apache.commons.compress.archivers.ArchiveEntry; + import org.apache.commons.compress.archivers.tar.TarArchiveEntry; + import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; + import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream; + + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"Unpack", "un-merge", "tar", "zip", "archive", "flowfile-stream", "flowfile-stream-v3"}) + @CapabilityDescription("Unpacks the content of FlowFiles that have been packaged with one of several different Packaging Formats, emitting one to many FlowFiles for each input FlowFile") + public class UnpackContent extends AbstractProcessor { + + public static final String AUTO_DETECT_FORMAT = "use mime.type attribute"; + public static final String TAR_FORMAT = "tar"; + public static final String ZIP_FORMAT = "zip"; + public static final String FLOWFILE_STREAM_FORMAT_V3 = "flowfile-stream-v3"; + public static final String FLOWFILE_STREAM_FORMAT_V2 = "flowfile-stream-v2"; + public static final String FLOWFILE_TAR_FORMAT = "flowfile-tar-v1"; + + // attribute keys + public static final String FRAGMENT_ID = "fragment.identifier"; + public static final String FRAGMENT_INDEX = "fragment.index"; + public static final String FRAGMENT_COUNT = "fragment.count"; + public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename"; + + public static final String OCTET_STREAM = "application/octet-stream"; + + public static final PropertyDescriptor PACKAGING_FORMAT = new PropertyDescriptor.Builder() + .name("Packaging Format") + .description("The Packaging Format used to create the file") + .required(true) + .allowableValues(AUTO_DETECT_FORMAT, TAR_FORMAT, ZIP_FORMAT, FLOWFILE_STREAM_FORMAT_V3, FLOWFILE_STREAM_FORMAT_V2, FLOWFILE_TAR_FORMAT) + .defaultValue(AUTO_DETECT_FORMAT) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Unpacked FlowFiles are sent to this relationship").build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile is sent to this relationship after it has been successfully unpacked").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("The original FlowFile is sent to this relationship when it cannot be unpacked for some reason").build(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> properties; + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(PACKAGING_FORMAT); + this.properties = Collections.unmodifiableList(properties); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + String packagingFormat = context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase(); + if (AUTO_DETECT_FORMAT.equals(packagingFormat)) { + final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); + if (mimeType == null) { + logger.error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + switch (mimeType.toLowerCase()) { + case "application/tar": + packagingFormat = TAR_FORMAT; + break; + case "application/zip": + packagingFormat = ZIP_FORMAT; + break; + case "application/flowfile-v3": + packagingFormat = FLOWFILE_STREAM_FORMAT_V3; + break; + case "application/flowfile-v2": + packagingFormat = FLOWFILE_STREAM_FORMAT_V2; + break; + case "application/flowfile-v1": + packagingFormat = FLOWFILE_TAR_FORMAT; + break; + default: { + logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); + session.transfer(flowFile, REL_SUCCESS); + return; + } + } + } + + final Unpacker unpacker; + final boolean addFragmentAttrs; + switch (packagingFormat) { + case TAR_FORMAT: + unpacker = new TarUnpacker(); + addFragmentAttrs = true; + break; + case ZIP_FORMAT: + unpacker = new ZipUnpacker(); + addFragmentAttrs = true; + break; + case FLOWFILE_STREAM_FORMAT_V2: + unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); + addFragmentAttrs = false; + break; + case FLOWFILE_STREAM_FORMAT_V3: + unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3()); + addFragmentAttrs = false; + break; + case FLOWFILE_TAR_FORMAT: + unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1()); + addFragmentAttrs = false; + break; + default: + throw new AssertionError("Packaging Format was " + context.getProperty(PACKAGING_FORMAT).getValue()); + } + + final List<FlowFile> unpacked = new ArrayList<>(); + try { + unpacker.unpack(session, flowFile, unpacked); + if (unpacked.isEmpty()) { + logger.error("Unable to unpack {} because it does not appear to have any entries; routing to failure", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + if (addFragmentAttrs) { + finishFragmentAttributes(session, flowFile, unpacked); + } + session.transfer(unpacked, REL_SUCCESS); + session.transfer(flowFile, REL_ORIGINAL); + session.getProvenanceReporter().fork(flowFile, unpacked); + logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked}); + } catch (final ProcessException e) { + logger.error("Unable to unpack {} due to {}; routing to failure", new Object[]{flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + session.remove(unpacked); + } + } + + private static interface Unpacker { + + void unpack(ProcessSession session, FlowFile source, List<FlowFile> unpacked); + } + + private static class TarUnpacker implements Unpacker { + + @Override + public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) { + final String fragmentId = UUID.randomUUID().toString(); + session.read(source, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + int fragmentCount = 0; + try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(in))) { + TarArchiveEntry tarEntry; + while ((tarEntry = tarIn.getNextTarEntry()) != null) { + if (tarEntry.isDirectory()) { + continue; + } + final File file = new File(tarEntry.getName()); + final Path filePath = file.toPath(); + final String filePathString = filePath.getParent() + "/"; + final Path absPath = filePath.toAbsolutePath(); + final String absPathString = absPath.getParent().toString() + "/"; + + FlowFile unpackedFile = session.create(source); + try { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), file.getName()); + attributes.put(CoreAttributes.PATH.key(), filePathString); + attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString); + attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + + attributes.put(FRAGMENT_ID, fragmentId); + attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount)); + + unpackedFile = session.putAllAttributes(unpackedFile, attributes); + + final long fileSize = tarEntry.getSize(); + unpackedFile = session.write(unpackedFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + StreamUtils.copy(tarIn, out, fileSize); + } + }); + } finally { + unpacked.add(unpackedFile); + } + } + } + } + }); + } + } + + private static class ZipUnpacker implements Unpacker { + + @Override + public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) { + final String fragmentId = UUID.randomUUID().toString(); + session.read(source, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + int fragmentCount = 0; + try (final ZipArchiveInputStream zipIn = new ZipArchiveInputStream(new BufferedInputStream(in))) { + ArchiveEntry zipEntry; + while ((zipEntry = zipIn.getNextEntry()) != null) { + if (zipEntry.isDirectory()) { + continue; + } + final File file = new File(zipEntry.getName()); + final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent(); + final Path absPath = file.toPath().toAbsolutePath(); + final String absPathString = absPath.getParent().toString() + "/"; + + FlowFile unpackedFile = session.create(source); + try { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), file.getName()); + attributes.put(CoreAttributes.PATH.key(), parentDirectory); + attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString); + attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + + attributes.put(FRAGMENT_ID, fragmentId); + attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount)); + + unpackedFile = session.putAllAttributes(unpackedFile, attributes); + unpackedFile = session.write(unpackedFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + StreamUtils.copy(zipIn, out); + } + }); + } finally { + unpacked.add(unpackedFile); + } + } + } + } + }); + } + } + + private static class FlowFileStreamUnpacker implements Unpacker { + + private final FlowFileUnpackager unpackager; + + public FlowFileStreamUnpacker(final FlowFileUnpackager unpackager) { + this.unpackager = unpackager; + } + + @Override + public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) { + session.read(source, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + while (unpackager.hasMoreData()) { + final ObjectHolder<Map<String, String>> attributesRef = new ObjectHolder<>(null); + FlowFile unpackedFile = session.create(source); + try { + unpackedFile = session.write(unpackedFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { + final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out); + if (attributes == null) { + throw new IOException("Failed to unpack " + source + ": stream had no Attributes"); + } + attributesRef.set(attributes); + } + } + }); + + final Map<String, String> attributes = attributesRef.get(); + + // Remove the UUID from the attributes because we don't want to use the same UUID for this FlowFile. + // If we do, then we get into a weird situation if we use MergeContent to create a FlowFile Package + // and later unpack it -- in this case, we have two FlowFiles with the same UUID. + attributes.remove(CoreAttributes.UUID.key()); + + // maintain backward compatibility with legacy NiFi attribute names + mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME.key()); + mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH.key()); + mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE.key()); + mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE.key()); + + if (!attributes.containsKey(CoreAttributes.MIME_TYPE.key())) { + attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + } + + unpackedFile = session.putAllAttributes(unpackedFile, attributes); + } finally { + unpacked.add(unpackedFile); + } + } + } + } + }); + } + } + + /** + * Maps attributes from legacy nifi to the new naming scheme + * + * @param attributes + * @param oldKey + * @param newKey + */ + private static void mapAttributes(final Map<String, String> attributes, final String oldKey, final String newKey) { + if (!attributes.containsKey(newKey) && attributes.containsKey(oldKey)) { + attributes.put(newKey, attributes.get(oldKey)); + } + } + + /** + * If the unpacked flowfiles contain fragment index attributes, then we need + * to apply fragment count and other attributes for completeness. + * + * @param session + * @param source + * @param unpacked + */ + private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) { + // first pass verifies all FlowFiles have the FRAGMENT_INDEX attribute and gets the total number of fragments + int fragmentCount = 0; + for (FlowFile ff : unpacked) { + String fragmentIndex = ff.getAttribute(FRAGMENT_INDEX); + if (fragmentIndex != null) { + fragmentCount++; + } else { + return; + } + } + + String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key()); + if (originalFilename.endsWith(".tar") || originalFilename.endsWith(".zip") || originalFilename.endsWith(".pkg")) { + originalFilename = originalFilename.substring(0, originalFilename.length() - 4); + } + + // second pass adds fragment attributes + ArrayList<FlowFile> newList = new ArrayList<>(unpacked); + unpacked.clear(); + for (FlowFile ff : newList) { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(FRAGMENT_COUNT, String.valueOf(fragmentCount)); + attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename); + FlowFile newFF = session.putAllAttributes(ff, attributes); + unpacked.add(newFF); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java index 0000000,8f2001a..4808a59 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java @@@ -1,0 -1,147 +1,147 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.File; + import java.io.IOException; + import java.io.InputStream; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + import java.util.concurrent.atomic.AtomicReference; + + import javax.xml.transform.stream.StreamSource; + import javax.xml.validation.Schema; + import javax.xml.validation.SchemaFactory; + import javax.xml.validation.Validator; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.util.BooleanHolder; + + import org.xml.sax.SAXException; + + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"xml", "schema", "validation", "xsd"}) + @CapabilityDescription("Validates the contents of FlowFiles against a user-specified XML Schema file") + public class ValidateXml extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") + .description("The path to the Schema file that is to be used for validation") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder().name("valid").description("FlowFiles that are successfully validated against the schema are routed to this relationship").build(); + public static final Relationship REL_INVALID = new Relationship.Builder().name("invalid").description("FlowFiles that are not valid according to the specified schema are routed to this relationship").build(); + + private static final String SCHEMA_LANGUAGE = "http://www.w3.org/2001/XMLSchema"; + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void parseSchema(final ProcessContext context) throws IOException, SAXException { + try { + final File file = new File(context.getProperty(SCHEMA_FILE).getValue()); + final SchemaFactory schemaFactory = SchemaFactory.newInstance(SCHEMA_LANGUAGE); + final Schema schema = schemaFactory.newSchema(file); + this.schemaRef.set(schema); + } catch (final SAXException e) { + throw e; + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFiles = session.get(50); + if (flowFiles.isEmpty()) { + return; + } + + final Schema schema = schemaRef.get(); + final Validator validator = schema.newValidator(); + final ProcessorLog logger = getLogger(); + + for (final FlowFile flowFile : flowFiles) { + final BooleanHolder valid = new BooleanHolder(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try { + validator.validate(new StreamSource(in)); + } catch (final IllegalArgumentException | SAXException e) { + valid.set(false); + logger.debug("Failed to validate {} against schema due to {}", new Object[]{flowFile, e}); + } + } + }); + + if (valid.get()) { + logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile}); + session.getProvenanceReporter().route(flowFile, REL_VALID); + session.transfer(flowFile, REL_VALID); + } else { + logger.info("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile}); + session.getProvenanceReporter().route(flowFile, REL_INVALID); + session.transfer(flowFile, REL_INVALID); + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/RESTServiceContentModified.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/RESTServiceContentModified.java index 0000000,8548c46..ec3211c mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/RESTServiceContentModified.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/RESTServiceContentModified.java @@@ -1,0 -1,78 +1,78 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.IOException; + import java.text.SimpleDateFormat; + import java.util.Locale; + import java.util.TimeZone; + + import javax.servlet.http.HttpServlet; + import javax.servlet.http.HttpServletRequest; + import javax.servlet.http.HttpServletResponse; + + public class RESTServiceContentModified extends HttpServlet { + + private static final long serialVersionUID = 1L; + static String result = "[\"sample1\",\"sample2\",\"sample3\",\"sample4\"]"; + static long modificationDate = System.currentTimeMillis() / 1000 * 1000; // time resolution is to the second + static int ETAG; + public static boolean IGNORE_ETAG = false; + public static boolean IGNORE_LAST_MODIFIED = false; + + public RESTServiceContentModified() { - this.ETAG = this.hashCode(); ++ ETAG = this.hashCode(); + } + + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + String ifModifiedSince = request.getHeader("If-Modified-Since"); + String ifNoneMatch = request.getHeader("If-None-Match"); + + final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + + response.setContentType("application/json"); + if (ifNoneMatch != null && ifNoneMatch.length() > 0 && !IGNORE_ETAG && Integer.parseInt(ifNoneMatch) == ETAG) { + response.setStatus(304); + response.setHeader("Last-Modified", dateFormat.format(modificationDate)); + response.setHeader("ETag", Integer.toString(ETAG)); + return; + } + + long date = -1; + if (ifModifiedSince != null && ifModifiedSince.length() > 0 && !IGNORE_LAST_MODIFIED) { + try { + date = dateFormat.parse(ifModifiedSince).getTime(); + } catch (Exception e) { + + } + } + if (date >= modificationDate) { + response.setStatus(304); + response.setHeader("Last-Modified", dateFormat.format(modificationDate)); + response.setHeader("ETag", Integer.toString(ETAG)); + return; + } + + response.setStatus(200); + response.setHeader("Last-Modified", dateFormat.format(modificationDate)); + response.setHeader("ETag", Integer.toString(ETAG)); + response.getOutputStream().println(result); + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java index 0000000,1af48d6..71c8583 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java @@@ -1,0 -1,111 +1,112 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + ++import static org.junit.Assert.assertTrue; ++ + import java.io.IOException; -import java.nio.file.Files; + import java.nio.file.Paths; ++ + import org.apache.nifi.util.MockFlowFile; + import org.apache.nifi.util.TestRunner; + import org.apache.nifi.util.TestRunners; -import static org.junit.Assert.assertTrue; + import org.junit.Test; + + public class TestCompressContent { + + @Test + public void testBzip2DecompressConcatenated() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, "decompress"); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, "bzip2"); + runner.setProperty(CompressContent.UPDATE_FILENAME, "false"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFileConcat.txt.bz2")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFileConcat.txt")); + flowFile.assertAttributeEquals("filename", "SampleFileConcat.txt.bz2"); // not updating filename + } + + @Test + public void testBzip2Decompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, "decompress"); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, "bzip2"); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.bz2")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + flowFile.assertAttributeEquals("filename", "SampleFile.txt"); + + runner.clearTransferState(); + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile1.txt.bz2")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + flowFile.assertAttributeEquals("filename", "SampleFile1.txt"); + } + + @Test + public void testGzipDecompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, "decompress"); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, "gzip"); + assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid()); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.gz")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + flowFile.assertAttributeEquals("filename", "SampleFile.txt"); + + runner.clearTransferState(); + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile1.txt.gz")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + flowFile.assertAttributeEquals("filename", "SampleFile1.txt"); + } + + @Test + public void testFilenameUpdatedOnCompress() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, "compress"); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, "gzip"); + assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid()); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals("filename", "SampleFile.txt.gz"); + + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java index 0000000,6092761..1b057d9 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java @@@ -1,0 -1,47 +1,44 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + -import org.apache.nifi.processors.standard.ConvertCharacterSet; + import java.io.File; + import java.io.IOException; -import java.nio.file.Files; + import java.nio.file.Paths; + + import org.apache.nifi.util.MockFlowFile; + import org.apache.nifi.util.TestRunner; + import org.apache.nifi.util.TestRunners; - + import org.junit.Test; + + public class TestConvertCharacterSet { + + @Test + public void test() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertCharacterSet()); + runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "ASCII"); + runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "UTF-32"); + + runner.enqueue(Paths.get("src/test/resources/CharacterSetConversionSamples/Original.txt")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertCharacterSet.REL_SUCCESS, 1); + final MockFlowFile output = runner.getFlowFilesForRelationship(ConvertCharacterSet.REL_SUCCESS).get(0); + output.assertContentEquals(new File("src/test/resources/CharacterSetConversionSamples/Converted2.txt")); + } + + }
