http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java index 0000000,a1fc86d..4827ee3 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java @@@ -1,0 -1,429 +1,429 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import static javax.xml.xpath.XPathConstants.NODESET; + import static javax.xml.xpath.XPathConstants.STRING; + + import java.io.ByteArrayOutputStream; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.io.UnsupportedEncodingException; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Properties; + import java.util.Set; + import java.util.concurrent.atomic.AtomicReference; + + import javax.xml.namespace.QName; + import javax.xml.transform.ErrorListener; + import javax.xml.transform.OutputKeys; + import javax.xml.transform.Source; + import javax.xml.transform.Transformer; + import javax.xml.transform.TransformerException; + import javax.xml.transform.TransformerFactory; + import javax.xml.transform.TransformerFactoryConfigurationError; + import javax.xml.transform.stream.StreamResult; + import javax.xml.xpath.XPathExpression; + import javax.xml.xpath.XPathExpressionException; + import javax.xml.xpath.XPathFactory; + import javax.xml.xpath.XPathFactoryConfigurationException; + + import net.sf.saxon.lib.NamespaceConstant; + import net.sf.saxon.xpath.XPathEvaluator; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.components.Validator; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.stream.io.BufferedOutputStream; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.util.ObjectHolder; + import org.xml.sax.InputSource; + + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"XML", "evaluate", "XPath"}) + @CapabilityDescription("Evaluates one or more XPaths against the content of a FlowFile. The results of those XPaths are assigned to " + + "FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of the " + + "Processor. XPaths are entered by adding user-defined properties; the name of the property maps to the Attribute " + + "Name into which the result will be placed (if the Destination is flowfile-content; otherwise, the property name is ignored). " + + "The value of the property must be a valid XPath expression. If the XPath evaluates to more than one node and the Return Type is " + + "set to 'nodeset' (either directly, or via 'auto-detect' with a Destination of " + + "'flowfile-content', the FlowFile will be unmodified and will be routed to failure. If the XPath does not " + + "evaluate to a Node, the FlowFile will be routed to 'unmatched' without having its contents modified. If Destination is " + + "flowfile-attribute and the expression matches nothing, attributes will be created with empty strings as the value, and the " + + "FlowFile will always be routed to 'matched'") + public class EvaluateXPath extends AbstractProcessor { + + public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute"; + public static final String DESTINATION_CONTENT = "flowfile-content"; + public static final String RETURN_TYPE_AUTO = "auto-detect"; + public static final String RETURN_TYPE_NODESET = "nodeset"; + public static final String RETURN_TYPE_STRING = "string"; + + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .name("Destination") + .description("Indicates whether the results of the XPath evaluation are written to the FlowFile content or a FlowFile attribute; if using attribute, must specify the Attribute Name property. If set to flowfile-content, only one XPath may be specified, and the property name is ignored.") + .required(true) + .allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTE) + .defaultValue(DESTINATION_CONTENT) + .build(); + + public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor.Builder() + .name("Return Type") + .description("Indicates the desired return type of the Xpath expressions. Selecting 'auto-detect' will set the return type to 'nodeset' for a Destination of 'flowfile-content', and 'string' for a Destination of 'flowfile-attribute'.") + .required(true) + .allowableValues(RETURN_TYPE_AUTO, RETURN_TYPE_NODESET, RETURN_TYPE_STRING) + .defaultValue(RETURN_TYPE_AUTO) + .build(); + + public static final Relationship REL_MATCH = new Relationship.Builder().name("matched").description("FlowFiles are routed to this relationship when the XPath is successfully evaluated and the FlowFile is modified as a result").build(); + public static final Relationship REL_NO_MATCH = new Relationship.Builder().name("unmatched").description("FlowFiles are routed to this relationship when the XPath does not match the content of the FlowFile and the Destination is set to flowfile-content").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles are routed to this relationship when the XPath cannot be evaluated against the content of the FlowFile; for instance, if the FlowFile is not valid XML, or if the Return Type is 'nodeset' and the XPath evaluates to multiple nodes").build(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> properties; + + private final AtomicReference<XPathFactory> factoryRef = new AtomicReference<>(); + + static { + System.setProperty("javax.xml.xpath.XPathFactory:" + NamespaceConstant.OBJECT_MODEL_SAXON, + "net.sf.saxon.xpath.XPathFactoryImpl"); + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_MATCH); + relationships.add(REL_NO_MATCH); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(DESTINATION); + properties.add(RETURN_TYPE); + this.properties = Collections.unmodifiableList(properties); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext context) { + final List<ValidationResult> results = new ArrayList<>(super.customValidate(context)); + + final String destination = context.getProperty(DESTINATION).getValue(); + if (DESTINATION_CONTENT.equals(destination)) { + int xpathCount = 0; + + for (final PropertyDescriptor desc : context.getProperties().keySet()) { + if (desc.isDynamic()) { + xpathCount++; + } + } + + if (xpathCount != 1) { + results.add(new ValidationResult.Builder().subject("XPaths").valid(false).explanation("Exactly one XPath must be set if using destination of " + DESTINATION_CONTENT).build()); + } + } + + return results; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void initializeXPathFactory() throws XPathFactoryConfigurationException { + factoryRef.set(XPathFactory.newInstance(NamespaceConstant.OBJECT_MODEL_SAXON)); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .expressionLanguageSupported(false) + .addValidator(new XPathValidator()) + .required(false) + .dynamic(true) + .build(); + } + + @Override + @SuppressWarnings("unchecked") + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFiles = session.get(50); + if (flowFiles.isEmpty()) { + return; + } + + final ProcessorLog logger = getLogger(); + final XPathFactory factory = factoryRef.get(); + final XPathEvaluator xpathEvaluator = (XPathEvaluator) factory.newXPath(); + final Map<String, XPathExpression> attributeToXPathMap = new HashMap<>(); + + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (!entry.getKey().isDynamic()) { + continue; + } + final XPathExpression xpathExpression; + try { + xpathExpression = xpathEvaluator.compile(entry.getValue()); + attributeToXPathMap.put(entry.getKey().getName(), xpathExpression); + } catch (XPathExpressionException e) { + throw new ProcessException(e); // should not happen because we've already validated the XPath (in XPathValidator) + } + } + + final XPathExpression slashExpression; + try { + slashExpression = xpathEvaluator.compile("/"); + } catch (XPathExpressionException e) { + logger.error("unable to compile XPath expression due to {}", new Object[]{e}); + session.transfer(flowFiles, REL_FAILURE); + return; + } + + final String destination = context.getProperty(DESTINATION).getValue(); + final QName returnType; + + switch (context.getProperty(RETURN_TYPE).getValue()) { + case RETURN_TYPE_AUTO: + if (DESTINATION_ATTRIBUTE.equals(destination)) { + returnType = STRING; + } else if (DESTINATION_CONTENT.equals(destination)) { + returnType = NODESET; + } else { + throw new IllegalStateException("The only possible destinations should be CONTENT or ATTRIBUTE..."); + } + break; + case RETURN_TYPE_NODESET: + returnType = NODESET; + break; + case RETURN_TYPE_STRING: + returnType = STRING; + break; + default: + throw new IllegalStateException("There are no other return types..."); + } + + flowFileLoop: + for (FlowFile flowFile : flowFiles) { + final ObjectHolder<Throwable> error = new ObjectHolder<>(null); + final ObjectHolder<Source> sourceRef = new ObjectHolder<>(null); + + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + final List<Source> rootList = (List<Source>) slashExpression.evaluate(new InputSource(in), NODESET); + sourceRef.set(rootList.get(0)); + } catch (final Exception e) { + error.set(e); + } + } + }); + + if (error.get() != null) { + logger.error("unable to evaluate XPath against {} due to {}; routing to 'failure'", new Object[]{flowFile, error.get()}); + session.transfer(flowFile, REL_FAILURE); + continue; + } + + final Map<String, String> xpathResults = new HashMap<>(); + + for (final Map.Entry<String, XPathExpression> entry : attributeToXPathMap.entrySet()) { + Object result = null; + try { + result = entry.getValue().evaluate(sourceRef.get(), returnType); + if (result == null) { + continue; + } + } catch (final XPathExpressionException e) { + logger.error("failed to evaluate XPath for {} for Property {} due to {}; routing to failure", new Object[]{flowFile, entry.getKey(), e}); + session.transfer(flowFile, REL_FAILURE); + continue flowFileLoop; + } + + if (returnType == NODESET) { + List<Source> nodeList = (List<Source>) result; + if (nodeList.isEmpty()) { + logger.info("Routing {} to 'unmatched'", new Object[]{flowFile}); + session.transfer(flowFile, REL_NO_MATCH); + continue flowFileLoop; + } else if (nodeList.size() > 1) { + logger.error("Routing {} to 'failure' because the XPath evaluated to {} XML nodes", new Object[]{ + flowFile, nodeList.size()}); + session.transfer(flowFile, REL_FAILURE); + continue flowFileLoop; + } + final Source sourceNode = nodeList.get(0); + + if (DESTINATION_ATTRIBUTE.equals(destination)) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + doTransform(sourceNode, baos); + xpathResults.put(entry.getKey(), baos.toString("UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new ProcessException(e); // this REALLY shouldn't happen + } catch (TransformerException e) { + error.set(e); + } + + } else if (DESTINATION_CONTENT.equals(destination)) { + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { + doTransform(sourceNode, out); + } catch (TransformerException e) { + error.set(e); + } + } + }); + } + + } else if (returnType == STRING) { + final String resultString = (String) result; + + if (DESTINATION_ATTRIBUTE.equals(destination)) { + xpathResults.put(entry.getKey(), resultString); + } else if (DESTINATION_CONTENT.equals(destination)) { + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { + out.write(resultString.getBytes("UTF-8")); + } + } + }); + } + } + } + + if (error.get() == null) { + if (DESTINATION_ATTRIBUTE.equals(destination)) { + flowFile = session.putAllAttributes(flowFile, xpathResults); + final Relationship destRel = xpathResults.isEmpty() ? REL_NO_MATCH : REL_MATCH; + logger.info("Successfully evaluated XPaths against {} and found {} matches; routing to {}", new Object[]{flowFile, + xpathResults.size(), destRel.getName()}); + session.transfer(flowFile, destRel); + session.getProvenanceReporter().modifyAttributes(flowFile); + } else if (DESTINATION_CONTENT.equals(destination)) { + logger.info("Successfully updated content for {}; routing to 'matched'", new Object[]{flowFile}); + session.transfer(flowFile, REL_MATCH); + session.getProvenanceReporter().modifyContent(flowFile); + } + } else { + logger.error("Failed to write XPath result for {} due to {}; routing original to 'failure'", new Object[]{flowFile, error.get()}); + session.transfer(flowFile, REL_FAILURE); + } + } + } + + private void doTransform(final Source sourceNode, OutputStream out) throws TransformerFactoryConfigurationError, TransformerException { + final Transformer transformer; + try { + transformer = TransformerFactory.newInstance().newTransformer(); + } catch (final Exception e) { + throw new ProcessException(e); + } + + final Properties props = new Properties(); + props.setProperty(OutputKeys.METHOD, "xml"); + props.setProperty(OutputKeys.INDENT, "no"); + props.setProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); + transformer.setOutputProperties(props); + + final ProcessorLog logger = getLogger(); + + final ObjectHolder<TransformerException> error = new ObjectHolder<>(null); + transformer.setErrorListener(new ErrorListener() { + @Override + public void warning(final TransformerException exception) throws TransformerException { + logger.warn("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception}); + } + + @Override + public void error(final TransformerException exception) throws TransformerException { + logger.error("Encountered error from XPath Engine: ", new Object[] {exception.toString(), exception}); + error.set(exception); + } + + @Override + public void fatalError(final TransformerException exception) throws TransformerException { + logger.error("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception}); + error.set(exception); + } + }); + + transformer.transform(sourceNode, new StreamResult(out)); + if ( error.get() != null ) { + throw error.get(); + } + } + + private static class XPathValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) { + try { + XPathFactory factory = XPathFactory.newInstance(NamespaceConstant.OBJECT_MODEL_SAXON); + final XPathEvaluator evaluator = (XPathEvaluator) factory.newXPath(); + + String error = null; + try { + evaluator.compile(input); + } catch (final Exception e) { + error = e.toString(); + } + + return new ValidationResult.Builder().input(input).subject(subject).valid(error == null).explanation(error).build(); + } catch (final Exception e) { + return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Unable to initialize XPath engine due to " + e.toString()).build(); + } + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java index 0000000,8b4ce09..3ddee83 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java @@@ -1,0 -1,463 +1,463 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.ByteArrayOutputStream; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Properties; + import java.util.Set; + + import javax.xml.parsers.DocumentBuilderFactory; + import javax.xml.transform.OutputKeys; + import javax.xml.transform.Transformer; + import javax.xml.transform.TransformerConfigurationException; + import javax.xml.transform.TransformerException; + import javax.xml.transform.TransformerFactory; + import javax.xml.transform.TransformerFactoryConfigurationError; + import javax.xml.transform.sax.SAXSource; + import javax.xml.transform.stream.StreamResult; + + import net.sf.saxon.s9api.DOMDestination; + import net.sf.saxon.s9api.Processor; + import net.sf.saxon.s9api.SaxonApiException; + import net.sf.saxon.s9api.XQueryCompiler; + import net.sf.saxon.s9api.XQueryEvaluator; + import net.sf.saxon.s9api.XQueryExecutable; + import net.sf.saxon.s9api.XdmItem; + import net.sf.saxon.s9api.XdmNode; + import net.sf.saxon.s9api.XdmValue; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.components.Validator; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.stream.io.BufferedOutputStream; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.util.ObjectHolder; + + import org.w3c.dom.Document; + import org.xml.sax.InputSource; + + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"XML", "evaluate", "XPath", "XQuery", "experimental"}) + @CapabilityDescription( + "Evaluates one or more XQueries against the content of a FlowFile. The results of those XQueries are assigned " + + "to FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of " + + "the Processor. XQueries are entered by adding user-defined properties; the name of the property maps to the " + + "Attribute Name into which the result will be placed (if the Destination is 'flowfile-attribute'; otherwise, " + + "the property name is ignored). The value of the property must be a valid XQuery. If the XQuery returns more " + + "than one result, new attributes or FlowFiles (for Destinations of 'flowfile-attribute' or 'flowfile-content' " + + "respectively) will be created for each result (attributes will have a '.n' one-up number appended to the " + + "specified attribute name). If any provided XQuery returns a result, the FlowFile(s) will be routed to " + + "'matched'. If no provided XQuery returns a result, the FlowFile will be routed to 'unmatched'. If the " + + "Destination is 'flowfile-attribute' and the XQueries matche nothing, no attributes will be applied to the " + + "FlowFile.") + + public class EvaluateXQuery extends AbstractProcessor { + + public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute"; + public static final String DESTINATION_CONTENT = "flowfile-content"; + + public static final String OUTPUT_METHOD_XML = "xml"; + public static final String OUTPUT_METHOD_HTML = "html"; + public static final String OUTPUT_METHOD_TEXT = "text"; + + public static final String UTF8 = "UTF-8"; + + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .name("Destination") + .description( + "Indicates whether the results of the XQuery evaluation are written to the FlowFile content or a " + + "FlowFile attribute. If set to <flowfile-content>, only one XQuery may be specified and the property " + + "name is ignored. If set to <flowfile-attribute> and the XQuery returns more than one result, " + + "multiple attributes will be added to theFlowFile, each named with a '.n' one-up number appended to " + + "the specified attribute name") + .required(true) + .allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTE) + .defaultValue(DESTINATION_CONTENT) + .build(); + + public static final PropertyDescriptor XML_OUTPUT_METHOD = new PropertyDescriptor.Builder() + .name("Output: Method") + .description("Identifies the overall method that should be used for outputting a result tree.") + .required(true) + .allowableValues(OUTPUT_METHOD_XML, OUTPUT_METHOD_HTML, OUTPUT_METHOD_TEXT) + .defaultValue(OUTPUT_METHOD_XML) + .build(); + + public static final PropertyDescriptor XML_OUTPUT_OMIT_XML_DECLARATION = new PropertyDescriptor.Builder() + .name("Output: Omit XML Declaration") + .description("Specifies whether the processor should output an XML declaration when transforming a result tree.") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("false") + .build(); + + public static final PropertyDescriptor XML_OUTPUT_INDENT = new PropertyDescriptor.Builder() + .name("Output: Indent") + .description("Specifies whether the processor may add additional whitespace when outputting a result tree.") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("false") + .build(); + + public static final Relationship REL_MATCH = new Relationship.Builder() + .name("matched") + .description( + "FlowFiles are routed to this relationship when the XQuery is successfully evaluated and the FlowFile " + + "is modified as a result") + .build(); + + public static final Relationship REL_NO_MATCH = new Relationship.Builder() + .name("unmatched") + .description( + "FlowFiles are routed to this relationship when the XQuery does not match the content of the FlowFile " + + "and the Destination is set to flowfile-content") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description( + "FlowFiles are routed to this relationship when the XQuery cannot be evaluated against the content of " + + "the FlowFile.") + .build(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> properties; + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_MATCH); + relationships.add(REL_NO_MATCH); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(DESTINATION); + properties.add(XML_OUTPUT_METHOD); + properties.add(XML_OUTPUT_OMIT_XML_DECLARATION); + properties.add(XML_OUTPUT_INDENT); + this.properties = Collections.unmodifiableList(properties); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext context) { + final List<ValidationResult> results = new ArrayList<>(super.customValidate(context)); + + final String destination = context.getProperty(DESTINATION).getValue(); + if (DESTINATION_CONTENT.equals(destination)) { + int xQueryCount = 0; + for (final PropertyDescriptor desc : context.getProperties().keySet()) { + if (desc.isDynamic()) { + xQueryCount++; + } + } + if (xQueryCount != 1) { + results.add(new ValidationResult.Builder() + .subject("XQueries") + .valid(false) + .explanation("Exactly one XQuery must be set if using destination of " + DESTINATION_CONTENT) + .build()); + } + } + return results; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .expressionLanguageSupported(false) + .addValidator(new XQueryValidator()) + .required(false) + .dynamic(true) + .build(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFileBatch = session.get(50); + if (flowFileBatch.isEmpty()) { + return; + } + final ProcessorLog logger = getLogger(); + final Map<String, XQueryExecutable> attributeToXQueryMap = new HashMap<>(); + + final Processor proc = new Processor(false); + final XQueryCompiler comp = proc.newXQueryCompiler(); + + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (!entry.getKey().isDynamic()) { + continue; + } + final XQueryExecutable exp; + try { + exp = comp.compile(entry.getValue()); + attributeToXQueryMap.put(entry.getKey().getName(), exp); + } catch (SaxonApiException e) { + throw new ProcessException(e); // should not happen because we've already validated the XQuery (in XQueryValidator) + } + } + + final XQueryExecutable slashExpression; + try { + slashExpression = comp.compile("/"); + } catch (SaxonApiException e) { + logger.error("unable to compile XQuery expression due to {}", new Object[]{e}); + session.transfer(flowFileBatch, REL_FAILURE); + return; + } + + final String destination = context.getProperty(DESTINATION).getValue(); + + flowFileLoop: + for (FlowFile flowFile : flowFileBatch) { + if (!isScheduled()) { + session.rollback(); + return; + } + + final ObjectHolder<Throwable> error = new ObjectHolder<>(null); + final ObjectHolder<XdmNode> sourceRef = new ObjectHolder<>(null); + + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + XQueryEvaluator qe = slashExpression.load(); + qe.setSource(new SAXSource(new InputSource(in))); + DocumentBuilderFactory dfactory = DocumentBuilderFactory.newInstance(); + dfactory.setNamespaceAware(true); + Document dom = dfactory.newDocumentBuilder().newDocument(); + qe.run(new DOMDestination(dom)); + XdmNode rootNode = proc.newDocumentBuilder().wrap(dom); + sourceRef.set(rootNode); + } catch (final Exception e) { + error.set(e); + } + } + }); + + if (error.get() != null) { + logger.error("unable to evaluate XQuery against {} due to {}; routing to 'failure'", + new Object[]{flowFile, error.get()}); + session.transfer(flowFile, REL_FAILURE); + continue; + } + + final Map<String, String> xQueryResults = new HashMap<>(); + List<FlowFile> childrenFlowFiles = new ArrayList<>(); + + for (final Map.Entry<String, XQueryExecutable> entry : attributeToXQueryMap.entrySet()) { + try { + XQueryEvaluator qe = entry.getValue().load(); + qe.setContextItem(sourceRef.get()); + XdmValue result = qe.evaluate(); + + if (DESTINATION_ATTRIBUTE.equals(destination)) { + int index = 1; + for (XdmItem item : result) { + String value = formatItem(item, context); + String attributeName = entry.getKey(); + if (result.size() > 1) { + attributeName += "." + index++; + } + xQueryResults.put(attributeName, value); + } + } else { // if (DESTINATION_CONTENT.equals(destination)){ + if (result.size() == 0) { + logger.info("Routing {} to 'unmatched'", new Object[]{flowFile}); + session.transfer(flowFile, REL_NO_MATCH); + continue flowFileLoop; + } else if (result.size() == 1) { + final XdmItem item = result.itemAt(0); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { + writeformattedItem(item, context, out); + } catch (TransformerFactoryConfigurationError | TransformerException e) { + throw new IOException(e); + } + } + }); + } else { + for (final XdmItem item : result) { + FlowFile ff = session.clone(flowFile); + ff = session.write(ff, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { + try { + writeformattedItem(item, context, out); + } catch (TransformerFactoryConfigurationError | TransformerException e) { + throw new IOException(e); + } + } + } + }); + childrenFlowFiles.add(ff); + } + } + } + } catch (final SaxonApiException e) { + logger.error("failed to evaluate XQuery for {} for Property {} due to {}; routing to failure", + new Object[]{flowFile, entry.getKey(), e}); + session.transfer(flowFile, REL_FAILURE); + session.remove(childrenFlowFiles); + continue flowFileLoop; + } catch (TransformerFactoryConfigurationError | TransformerException | IOException e) { + logger.error("Failed to write XQuery result for {} due to {}; routing original to 'failure'", new Object[]{ + flowFile, error.get()}); + session.transfer(flowFile, REL_FAILURE); + session.remove(childrenFlowFiles); + continue flowFileLoop; + } + } + + if (DESTINATION_ATTRIBUTE.equals(destination)) { + flowFile = session.putAllAttributes(flowFile, xQueryResults); + final Relationship destRel = xQueryResults.isEmpty() ? REL_NO_MATCH : REL_MATCH; + logger.info("Successfully evaluated XQueries against {} and found {} matches; routing to {}", + new Object[]{flowFile, xQueryResults.size(), destRel.getName()}); + session.transfer(flowFile, destRel); + session.getProvenanceReporter().modifyAttributes(flowFile); + } else { // if (DESTINATION_CONTENT.equals(destination)) { + if (!childrenFlowFiles.isEmpty()) { + logger.info("Successfully created {} new FlowFiles from {}; routing all to 'matched'", + new Object[]{childrenFlowFiles.size(), flowFile}); + session.transfer(childrenFlowFiles, REL_MATCH); + session.remove(flowFile); + } else { + logger.info("Successfully updated content for {}; routing to 'matched'", new Object[]{flowFile}); + session.transfer(flowFile, REL_MATCH); + session.getProvenanceReporter().modifyContent(flowFile); + } + } + } // end flowFileLoop + } + + private String formatItem(XdmItem item, ProcessContext context) throws TransformerConfigurationException, + TransformerFactoryConfigurationError, TransformerException, IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + writeformattedItem(item, context, baos); + return baos.toString(); + } + + void writeformattedItem(XdmItem item, ProcessContext context, OutputStream out) throws TransformerConfigurationException, + TransformerFactoryConfigurationError, TransformerException, IOException { + + if (item.isAtomicValue()) { + out.write(item.getStringValue().getBytes(UTF8)); + } else { // item is an XdmNode + XdmNode node = (XdmNode) item; + switch (node.getNodeKind()) { + case DOCUMENT: + case ELEMENT: + Transformer transformer = TransformerFactory.newInstance().newTransformer(); + final Properties props = getTransformerProperties(context); + transformer.setOutputProperties(props); + transformer.transform(node.asSource(), new StreamResult(out)); + break; + default: + out.write(node.getStringValue().getBytes(UTF8)); + } + } + } + + private Properties getTransformerProperties(ProcessContext context) { + final String method = context.getProperty(XML_OUTPUT_METHOD).getValue(); + boolean indent = context.getProperty(XML_OUTPUT_INDENT).asBoolean(); + boolean omitDeclaration = context.getProperty(XML_OUTPUT_OMIT_XML_DECLARATION).asBoolean(); + return getTransformerProperties(method, indent, omitDeclaration); + } + + static Properties getTransformerProperties(final String method, final boolean indent, final boolean omitDeclaration) { + final Properties props = new Properties(); + props.setProperty(OutputKeys.METHOD, method); + props.setProperty(OutputKeys.INDENT, indent ? "yes" : "no"); + props.setProperty(OutputKeys.OMIT_XML_DECLARATION, omitDeclaration ? "yes" : "no"); + return props; + } + + private static class XQueryValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) { + try { + final Processor proc = new Processor(false); + final XQueryCompiler comp = proc.newXQueryCompiler(); + String error = null; + try { + comp.compile(input); + } catch (final Exception e) { + error = e.toString(); + } + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(error == null) + .explanation(error) + .build(); + } catch (final Exception e) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Unable to initialize XQuery engine due to " + e.toString()) + .build(); + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index 0000000,ab0b2aa..dda3647 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@@ -1,0 -1,358 +1,358 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.BufferedReader; + import java.io.File; + import java.io.IOException; + import java.io.InputStream; + import java.io.InputStreamReader; + import java.io.OutputStream; + import java.lang.ProcessBuilder.Redirect; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.components.Validator; + import org.apache.nifi.expression.AttributeExpression.ResultType; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.stream.io.BufferedOutputStream; + import org.apache.nifi.stream.io.StreamUtils; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + + import org.apache.commons.io.IOUtils; + import org.apache.commons.lang3.StringUtils; + + /** + * <p> + * This processor executes an external command on the contents of a flow file, + * and creates a new flow file with the results of the command. + * </p> + * <p> + * <strong>Properties:</strong> + * </p> + * <ul> + * <li><strong>Command Path</strong> + * <ul> + * <li>Specifies the command to be executed; if just the name of an executable + * is provided, it must be in the user's environment PATH.</li> + * <li>Default value: none</li> + * <li>Supports expression language: true</li> + * </ul> + * </li> + * <li>Command Arguments + * <ul> + * <li>The arguments to supply to the executable delimited by the ';' character. + * Each argument may be an Expression Language statement.</li> + * <li>Default value: none</li> + * <li>Supports expression language: true</li> + * </ul> + * </li> + * <li>Working Directory + * <ul> + * <li>The directory to use as the current working directory when executing the + * command</li> + * <li>Default value: none (which means whatever NiFi's current working + * directory is...probably the root of the NiFi installation directory.)</li> + * <li>Supports expression language: true</li> + * </ul> + * </li> + * + * </ul> + * + * <p> + * <strong>Relationships:</strong> + * </p> + * <ul> + * <li>original + * <ul> + * <li>The destination path for the original incoming flow file</li> + * </ul> + * </li> + * <li>output-stream + * <ul> + * <li>The destination path for the flow file created from the command's + * output</li> + * </ul> + * </li> + * </ul> + * <p> + * + * @author unattributed + */ + @EventDriven + @SupportsBatching + @Tags({"command execution", "command", "stream", "execute"}) + @CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.") + public class ExecuteStreamCommand extends AbstractProcessor { + + public static final Relationship ORIGINAL_RELATIONSHIP = new Relationship.Builder() + .name("original") + .description("FlowFiles that were successfully processed") + .build(); + public static final Relationship OUTPUT_STREAM_RELATIONSHIP = new Relationship.Builder() + .name("output stream") + .description("The destination path for the flow file created from the command's output") + .build(); + private static final Set<Relationship> RELATIONSHIPS; + + static { + Set<Relationship> rels = new HashSet<>(); + rels.add(OUTPUT_STREAM_RELATIONSHIP); + rels.add(ORIGINAL_RELATIONSHIP); + RELATIONSHIPS = Collections.unmodifiableSet(rels); + } + + private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator( + ResultType.STRING, true); + static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder() + .name("Command Path") + .description( + "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") + .expressionLanguageSupported(true) + .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) + .required(true) + .build(); + + static final PropertyDescriptor EXECUTION_ARGUMENTS = new PropertyDescriptor.Builder() + .name("Command Arguments") + .description("The arguments to supply to the executable delimited by the ';' character.") + .expressionLanguageSupported(true) + .addValidator(new Validator() { + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + ValidationResult result = new ValidationResult.Builder() + .subject(subject) + .valid(true) + .input(input) + .build(); + String[] args = input.split(";"); + for (String arg : args) { + ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context); + if (!valResult.isValid()) { + result = valResult; + break; + } + } + return result; + } + }) + .build(); + + static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder() + .name("Working Directory") + .description("The directory to use as the current working directory when executing the command") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, true)) + .required(false) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES; + + static { + List<PropertyDescriptor> props = new ArrayList<>(); + props.add(EXECUTION_ARGUMENTS); + props.add(EXECUTION_COMMAND); + props.add(WORKING_DIR); + PROPERTIES = Collections.unmodifiableList(props); + } + + private ProcessorLog logger; + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected void init(ProcessorInitializationContext context) { + logger = getLogger(); + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (null == flowFile) { + return; + } + + final ArrayList<String> args = new ArrayList<>(); + final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue(); + args.add(executeCommand); + final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue(); + if (!StringUtils.isBlank(commandArguments)) { + for (String arg : commandArguments.split(";")) { + args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue()); + } + } + final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(flowFile).getValue(); + + final ProcessBuilder builder = new ProcessBuilder(); + + logger.debug("Executing and waiting for command {} with arguments {}", new Object[]{executeCommand, commandArguments}); + File dir = null; + if (!StringUtils.isBlank(workingDir)) { + dir = new File(workingDir); + if (!dir.exists() && !dir.mkdirs()) { + logger.warn("Failed to create working directory {}, using current working directory {}", + new Object[]{workingDir, System.getProperty("user.dir")}); + } + } + builder.command(args); + builder.directory(dir); + builder.redirectInput(Redirect.PIPE); + builder.redirectOutput(Redirect.PIPE); + final Process process; + try { + process = builder.start(); + } catch (IOException e) { + logger.error("Could not create external process to run command", e); + throw new ProcessException(e); + } + try (final OutputStream pos = process.getOutputStream(); + final InputStream pis = process.getInputStream(); + final InputStream pes = process.getErrorStream(); + final BufferedInputStream bis = new BufferedInputStream(pis); + final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) { + int exitCode = -1; + final BufferedOutputStream bos = new BufferedOutputStream(pos); + FlowFile outputStreamFlowFile = session.create(flowFile); + StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process); + session.read(flowFile, callback); + outputStreamFlowFile = callback.outputStreamFlowFile; + exitCode = callback.exitCode; + logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode}); + + Map<String, String> attributes = new HashMap<>(); + + final StringBuilder strBldr = new StringBuilder(); + try { + String line; + while ((line = bufferedReader.readLine()) != null) { + strBldr.append(line).append("\n"); + } + } catch (IOException e) { + strBldr.append("Unknown...could not read Process's Std Error"); + } + int length = strBldr.length() > 4000 ? 4000 : strBldr.length(); + attributes.put("execution.error", strBldr.substring(0, length)); + + if (exitCode == 0) { + logger.info("Transferring flow file {} to 'output stream'", new Object[]{outputStreamFlowFile}); + } else { + logger.error("Transferring flow file {} to 'output stream'. Executable command {} ended in an error: {}", + new Object[]{outputStreamFlowFile, executeCommand, strBldr.toString()}); + } + + attributes.put("execution.status", Integer.toString(exitCode)); + attributes.put("execution.command", executeCommand); + attributes.put("execution.command.args", commandArguments); + outputStreamFlowFile = session.putAllAttributes(outputStreamFlowFile, attributes); + session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP); + logger.info("Transferring flow file {} to original", new Object[]{flowFile}); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, ORIGINAL_RELATIONSHIP); + + } catch (final IOException ex) { + // could not close Process related streams + logger.warn("Problem terminating Process {}", new Object[]{process}, ex); + } finally { + process.destroy(); // last ditch effort to clean up that process. + } + } + + static class StdInWriterCallback implements InputStreamCallback { + + final OutputStream stdInWritable; + final InputStream stdOutReadable; + final ProcessorLog logger; + final ProcessSession session; + final Process process; + FlowFile outputStreamFlowFile; + int exitCode; + + public StdInWriterCallback(OutputStream stdInWritable, InputStream stdOutReadable, ProcessorLog logger, ProcessSession session, + FlowFile outputStreamFlowFile, Process process) { + this.stdInWritable = stdInWritable; + this.stdOutReadable = stdOutReadable; + this.logger = logger; + this.session = session; + this.outputStreamFlowFile = outputStreamFlowFile; + this.process = process; + } + + @Override + public void process(final InputStream incomingFlowFileIS) throws IOException { + outputStreamFlowFile = session.write(outputStreamFlowFile, new OutputStreamCallback() { + + @Override + public void process(OutputStream out) throws IOException { + Thread writerThread = new Thread(new Runnable() { + + @Override + public void run() { + try { + StreamUtils.copy(incomingFlowFileIS, stdInWritable); + } catch (IOException e) { + logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e); + } + // MUST close the output stream to the stdIn so that whatever is reading knows + // there is no more data + IOUtils.closeQuietly(stdInWritable); + } + }); + writerThread.setDaemon(true); + writerThread.start(); + StreamUtils.copy(stdOutReadable, out); + try { + exitCode = process.waitFor(); + } catch (InterruptedException e) { + logger.warn("Command Execution Process was interrupted", e); + } + } + }); + } + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java index 0000000,6f18a01..0ae4747 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java @@@ -1,0 -1,164 +1,164 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.IOException; + import java.io.OutputStream; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Random; + import java.util.Set; + import java.util.concurrent.atomic.AtomicReference; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + + @SupportsBatching + @Tags({"test", "random", "generate"}) + @CapabilityDescription("This processor creates FlowFiles of random data and is used for load testing") + public class GenerateFlowFile extends AbstractProcessor { + + private final AtomicReference<byte[]> data = new AtomicReference<>(); + + public static final String DATA_FORMAT_BINARY = "Binary"; + public static final String DATA_FORMAT_TEXT = "Text"; + + public static final PropertyDescriptor FILE_SIZE = new PropertyDescriptor.Builder() + .name("File Size") + .description("The size of the file that will be used") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The number of FlowFiles to be transferred in each invocation") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor DATA_FORMAT = new PropertyDescriptor.Builder() + .name("Data Format") + .description("Specifies whether the data should be Text or Binary") + .required(true) + .defaultValue(DATA_FORMAT_BINARY) + .allowableValues(DATA_FORMAT_BINARY, DATA_FORMAT_TEXT) + .build(); + public static final PropertyDescriptor UNIQUE_FLOWFILES = new PropertyDescriptor.Builder() + .name("Unique FlowFiles") + .description("If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles will get the same content but this offers much higher throughput") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); + + private List<PropertyDescriptor> descriptors; + private Set<Relationship> relationships; + + private static final char[] TEXT_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t ".toCharArray(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(FILE_SIZE); + descriptors.add(BATCH_SIZE); + descriptors.add(DATA_FORMAT); + descriptors.add(UNIQUE_FLOWFILES); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + if (context.getProperty(UNIQUE_FLOWFILES).asBoolean()) { + this.data.set(null); + } else { + this.data.set(generateData(context)); + } + + } + + private byte[] generateData(final ProcessContext context) { + final int byteCount = context.getProperty(FILE_SIZE).asDataSize(DataUnit.B).intValue(); + + final Random random = new Random(); + final byte[] array = new byte[byteCount]; + if (context.getProperty(DATA_FORMAT).getValue().equals(DATA_FORMAT_BINARY)) { + random.nextBytes(array); + } else { + for (int i = 0; i < array.length; i++) { + final int index = random.nextInt(TEXT_CHARS.length); + array[i] = (byte) TEXT_CHARS[index]; + } + } + + return array; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final byte[] data; + if (context.getProperty(UNIQUE_FLOWFILES).asBoolean()) { + data = generateData(context); + } else { + data = this.data.get(); + } + + for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) { + FlowFile flowFile = session.create(); + if (data.length > 0) { + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(data); + } + }); + } + + session.getProvenanceReporter().create(flowFile); + session.transfer(flowFile, SUCCESS); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java index 0000000,854ee37..2dabbc6 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java @@@ -1,0 -1,72 +1,72 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.util.ArrayList; + import java.util.Collections; + import java.util.List; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processors.standard.util.FTPTransfer; + import org.apache.nifi.processors.standard.util.FileTransfer; + + @SideEffectFree + @Tags({"FTP", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"}) + @CapabilityDescription("Fetches files from an FTP Server and creates FlowFiles from them") + public class GetFTP extends GetFileTransfer { + + private List<PropertyDescriptor> properties; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(FTPTransfer.HOSTNAME); + properties.add(FTPTransfer.PORT); + properties.add(FTPTransfer.USERNAME); + properties.add(FTPTransfer.PASSWORD); + properties.add(FTPTransfer.CONNECTION_MODE); + properties.add(FTPTransfer.TRANSFER_MODE); + properties.add(FTPTransfer.REMOTE_PATH); + properties.add(FTPTransfer.FILE_FILTER_REGEX); + properties.add(FTPTransfer.PATH_FILTER_REGEX); + properties.add(FTPTransfer.POLLING_INTERVAL); + properties.add(FTPTransfer.RECURSIVE_SEARCH); + properties.add(FTPTransfer.IGNORE_DOTTED_FILES); + properties.add(FTPTransfer.DELETE_ORIGINAL); + properties.add(FTPTransfer.CONNECTION_TIMEOUT); + properties.add(FTPTransfer.DATA_TIMEOUT); + properties.add(FTPTransfer.MAX_SELECTS); + properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE); + properties.add(FTPTransfer.USE_NATURAL_ORDERING); + this.properties = Collections.unmodifiableList(properties); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected FileTransfer getFileTransfer(final ProcessContext context) { + return new FTPTransfer(context, getLogger()); + } + }
