[ 
https://issues.apache.org/jira/browse/NIFI-3572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928175#comment-15928175
 ] 

ASF GitHub Bot commented on NIFI-3572:
--------------------------------------

Github user stevedlawrence commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1575#discussion_r106432521
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PipelineXml.java
 ---
    @@ -0,0 +1,451 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.commons.io.IOUtils;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.DynamicRelationship;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.StringUtils;
    +
    +import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
    +import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
    +import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
    +import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
    +import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
    +
    +import java.io.InputStream;
    +import java.io.IOException;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.AbstractMap.SimpleEntry;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.xml.sax.InputSource;
    +import javax.xml.transform.sax.SAXSource;
    +
    +import net.sf.saxon.s9api.DocumentBuilder;
    +import net.sf.saxon.s9api.QName;
    +import net.sf.saxon.s9api.SaxonApiException;
    +import net.sf.saxon.s9api.XdmNode;
    +
    +import com.xmlcalabash.core.XProcException;
    +import com.xmlcalabash.io.ReadablePipe;
    +import com.xmlcalabash.io.WritableDocument;
    +import com.xmlcalabash.model.RuntimeValue;
    +import com.xmlcalabash.model.Serialization;
    +import com.xmlcalabash.runtime.XPipeline;
    +import com.xmlcalabash.util.Input;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"XML, XProc, XMLCalabash"})
    +@CapabilityDescription(
    +    "Inserts a FlowFile into a specified XProc XML pipeline, allowing one 
to perform " +
    +    "complex validations and transformations of XML data within a single 
NiFi " +
    +    "processor. This processor provides a FlowFile to the primary input 
port of the " +
    +    "XProc pipeline. It is an error if the XProc pipeline does not define 
a primary " +
    +    "input port. When data exits the XProc pipeline via one or more output 
ports, a " +
    +    "FlowFile is created and transferred to a dynamic NiFi relationship 
having the " +
    +    "same name as the output port. If a failure occurs during XProc 
processing, the " +
    +    "original FlowFile is transferred to the 'pipeline failure' 
relationship and " +
    +    "nothing transferred to any of the the dynamic output port 
relationships. " +
    +    "Dynamic properties may be defined, with their names and values passed 
to the " +
    +    "XProc pipeline as XProc options. Note that all input and output XML 
data " +
    +    "reside in memory during XML pipeline processing. If memory usage is a 
" +
    +    "concern, the XML Pipeline Pool Size property could be used to limit 
the " +
    +    "amount of XML pipelines that run at a single time to help limit this 
memory " +
    +    "usage. For more information on XProc, visit 
http://www.w3.org/TR/xproc/";)
    +@DynamicProperty(
    +    name = "XProc Option Name",
    +    value = "XProc Option Value",
    +    supportsExpressionLanguage = true,
    +    description = "Option names and values passed to the XProc pipeline. 
The dynamic " +
    +        "property name must be in Clark-notation, {uri}name, though the 
{uri} " +
    +        "prefix may be optional depending on the XProc file. The property 
name " +
    +        "is passed directly to the XProc engine as an option name along 
with its " +
    +        "associated value.")
    +@DynamicRelationship(
    +    name = "XProc Output Port",
    +    description =
    +        "A dynamic relationship is created for each output port defined in 
the XProc " +
    +        "pipeline. When XML is written to an XProc output port, a FlowFile 
is created " +
    +        "for the XML, which is transferred to relationship of the same 
name. Based on " +
    +        "the XProc pipeline, a single input FlowFile could result in 
outputs to more " +
    +        "than one relationship. If an XProc output port specifies 
sequence='true', then " +
    +        "multiple FlowFiles could be transferred to the same output 
relationship for a " +
    +        "single input FlowFile.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "fragment.identifier",
    +        description = "All outputs produced from the same input FlowFile 
will have the same randomly generated UUID added for this attribute"),
    +    @WritesAttribute(attribute = "fragment.index",
    +        description = "A one-up number that indicates the ordering of 
output port FlowFiles that were created from a single parent FlowFile"),
    +    @WritesAttribute(attribute = "fragment.count",
    +        description = "The total number of FlowFiles generated from the 
input FlowFile"),
    +    @WritesAttribute(attribute = "segment.original.filename ",
    +        description = "The filename of the input FlowFile")
    +})
    +public class PipelineXml extends AbstractProcessor {
    +
    +    private String inputPort = null;
    +    private List<PropertyDescriptor> properties;
    +    private AtomicReference<Set<Relationship>> relationships = new 
AtomicReference<>();
    +    private BlockingQueue<PipelineXmlData> pipelinePool = null;
    +
    +    private static String findPrimaryInputPort(XPipeline pipeline) {
    +        for (String port : pipeline.getInputs()) {
    +            if (pipeline.getDeclareStep().getInput(port).getPrimary() &&
    +               
!pipeline.getDeclareStep().getInput(port).getParameterInput()) {
    +                return port;
    +            }
    +        }
    +        return null;
    +    }
    +
    +    private static final Validator XML_PIPELINE_VALIDATOR = new 
Validator() {
    +        @Override
    +        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
    +            final ValidationResult.Builder builder = new 
ValidationResult.Builder();
    +            builder.subject(subject).input(input);
    +
    +            PipelineXmlData pd = null;
    +            try {
    +                if (subject == XML_PIPELINE_CONFIG.getName()) {
    +                    pd = new PipelineXmlData(input);
    +                } else if (subject == XML_PIPELINE_FILE.getName()) {
    +                    pd = new PipelineXmlData(new Input(input));
    +                } else {
    +                    return builder.valid(false).explanation("Can only 
validate XML Pipeline Config or XML Pipeline File").build();
    +                }
    +            } catch (SaxonApiException | XProcException e) {
    +                return builder.valid(false).explanation("XProc pipeline is 
invalid: " + e).build();
    +            }
    +
    +            final String inputPort = findPrimaryInputPort(pd.pipeline);
    +            if (inputPort == null) {
    +                return builder.valid(false).explanation("XProc pipeline 
must define a primary non-parameter input port").build();
    +            }
    +
    +            return builder.valid(true).build();
    +        }
    +    };
    +
    +    private static final Validator XPROC_OPTION_NAME_VALIDATOR = new 
Validator() {
    +        @Override
    +        public ValidationResult validate(String optName, String optValue, 
ValidationContext context) {
    +            final ValidationResult.Builder builder = new 
ValidationResult.Builder();
    +            builder.subject(optName).input(optValue);
    +
    +            try {
    +                QName.fromClarkName(optName);
    +                builder.valid(true);
    +            } catch (IllegalArgumentException e) {
    +                builder.valid(false).explanation("Option name must be of 
the form {uri}local.");
    +            }
    +
    +            return builder.build();
    +        }
    +    };
    +
    +    // note that the names of these properties and relationships 
intentionally
    +    // have spaces so they cannot conflict with XProc option and output 
port
    +    // names, which are restricted to QName's and NCName's
    +    public static final PropertyDescriptor XML_PIPELINE_FILE = new 
PropertyDescriptor.Builder()
    +            .name("xml pipeline file")
    +            .displayName("XML Pipeline File")
    +            .description("Full path to a file containing the XProc XML 
pipeline configuration. Only one of XML Pipeline File or XML Pipeline Config 
may be used.")
    +            .required(false)
    +            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +            .addValidator(XML_PIPELINE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor XML_PIPELINE_CONFIG = new 
PropertyDescriptor.Builder()
    +            .name("xml pipeline config")
    +            .displayName("XML Pipeline Config")
    +            .description("XProc XML pipeline configuration. Only one of 
XML Pipeline File or XML Pipeline Config may be used.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .addValidator(XML_PIPELINE_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor XML_PIPELINE_POOL_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("xml pipeline pool size")
    +            .displayName("XML Pipeline Pool Size")
    +            .description("The library used to run the XML pipeline is not 
thread safe. To allow for processing multiple " +
    +                "FlowFiles in separate threads, this processor creates a 
pool of separate pipeline instances. This " +
    +                "value defines the size of this pool and the number of 
pipeline instances to create. Note that this " +
    +                "value effectively defines the maximum number of FlowFiles 
that can be processed at the same time in " +
    +                "separate threads, regardless of the number of threads. A 
higher number allows more parrallel " +
    +                "pipeline processing, but at the expense of additional 
memory used for each pipeline instance and the " +
    +                "XML input/output data that traverses throughout the 
pipeline.")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_PIPELINE_FAILURE = new 
Relationship.Builder()
    +        .name("pipeline failure")
    +        .description("FlowFiles that fail XProc processing are routed 
here")
    +        .build();
    +    public static final Relationship REL_PIPELINE_ORIGINAL = new 
Relationship.Builder()
    +        .name("original xml")
    +        .description("FlowFiles that successfully complete XProc 
processing are routed here")
    +        .build();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(XML_PIPELINE_FILE);
    +        properties.add(XML_PIPELINE_CONFIG);
    +        properties.add(XML_PIPELINE_POOL_SIZE);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> set = new HashSet<>();
    +        set.add(REL_PIPELINE_FAILURE);
    +        set.add(REL_PIPELINE_ORIGINAL);
    +        relationships = new AtomicReference<>(set);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships.get();
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .expressionLanguageSupported(true)
    +                .required(false)
    +                .addValidator(XPROC_OPTION_NAME_VALIDATOR)
    +                
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
    +        Set<ValidationResult> results = new HashSet<>();
    +
    +        final Set<Relationship> newRelationships = new HashSet<>();
    +        newRelationships.add(REL_PIPELINE_FAILURE);
    +        newRelationships.add(REL_PIPELINE_ORIGINAL);
    +
    +        Map<PropertyDescriptor, String> propertyMap = 
validationContext.getProperties();
    +        final String pipelineFile = propertyMap.get(XML_PIPELINE_FILE);
    +        final String pipelineConfig = propertyMap.get(XML_PIPELINE_CONFIG);
    +
    +        if (StringUtils.isEmpty(pipelineFile) == 
StringUtils.isEmpty(pipelineConfig)) {
    +            results.add(new 
ValidationResult.Builder().valid(false).explanation(
    +                "Exactly one of XML Pipeline File or XML Pipeline Config 
must be set").build());
    +            this.relationships.set(newRelationships);
    +            return results;
    +        }
    +
    +        PipelineXmlData pd = null;
    +        try {
    +            if (!StringUtils.isEmpty(pipelineFile)) {
    +                pd = new PipelineXmlData(new Input(pipelineFile));
    +            } else {
    +                pd = new PipelineXmlData(pipelineConfig);
    +            }
    +        } catch (Exception e) {
    +            // shouldn't be possible, this should have been validated in 
the individual validators
    +            results.add(new 
ValidationResult.Builder().valid(false).explanation(
    +                "Failed to parse pipeline data: " + e).build());
    +            this.relationships.set(newRelationships);
    +            return results;
    +        }
    +
    +        inputPort = findPrimaryInputPort(pd.pipeline);
    +
    +        // we know everything is valid, so lets add the new output port 
relationships
    +        final Set<String> outputPorts = pd.pipeline.getOutputs();
    +        for (final String outputPort: outputPorts) {
    +            final Relationship outputRel = new Relationship.Builder()
    +                .name(outputPort)
    +                .description("The XProc output port named '" + outputPort 
+ "'")
    +                .build();
    +            newRelationships.add(outputRel);
    +        }
    +
    +        this.relationships.set(newRelationships);
    +
    +        return results;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws 
SaxonApiException {
    +        final String pipelineFile = 
context.getProperty(XML_PIPELINE_FILE).getValue();
    +        final String pipelineConfig = 
context.getProperty(XML_PIPELINE_CONFIG).getValue();
    +        final int pipelinePoolSize = 
context.getProperty(XML_PIPELINE_POOL_SIZE).asInteger();
    +        pipelinePool = new ArrayBlockingQueue<>(pipelinePoolSize);
    +        for (int i = 0; i < pipelinePoolSize; i++) {
    +            final PipelineXmlData pd;
    +            if (!StringUtils.isEmpty(pipelineFile)) {
    +                pd = new PipelineXmlData(new Input(pipelineFile));
    +            } else {
    +                pd = new PipelineXmlData(pipelineConfig);
    +            }
    +            pipelinePool.add(pd);
    +        }
    +    }
    +
    +    @OnStopped
    +    public void onStopped(final ProcessContext context) {
    +        pipelinePool.clear();
    +        pipelinePool = null;
    +    }
    +
    +    private void handleInput(PipelineXmlData pd, ProcessContext context, 
FlowFile original, InputStream stream) throws SaxonApiException {
    +        XdmNode inputNode = pd.runtime.parse(new InputSource(stream));
    +        pd.pipeline.writeTo(inputPort, inputNode);
    +
    +        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
    +            if (!entry.getKey().isDynamic()) {
    +                continue;
    +            }
    +            final QName optName = 
QName.fromClarkName(entry.getKey().getName());
    +            final String value = 
context.newPropertyValue(entry.getValue()).evaluateAttributeExpressions(original).getValue();
    +            pd.pipeline.passOption(optName, new RuntimeValue(value));
    +        }
    +    }
    +
    +    private void handleOutput(PipelineXmlData pd, ProcessSession session, 
FlowFile original, List<Map.Entry<Relationship, FlowFile>> outputs) throws 
SaxonApiException {
    +        int fragmentCount = 0;
    +
    +        for (final Relationship rel: getRelationships()) {
    +            if (rel == REL_PIPELINE_FAILURE || rel == 
REL_PIPELINE_ORIGINAL) {
    +                continue;
    +            }
    +
    +            final ReadablePipe rpipe = pd.pipeline.readFrom(rel.getName());
    +            final Serialization serial = 
pd.pipeline.getSerialization(rel.getName());
    +
    +            while (rpipe.moreDocuments()) {
    +                final XdmNode node = rpipe.read();
    +
    +                FlowFile outputFF = session.create(original);
    +                outputFF = session.write(outputFF, out -> {
    +                    final WritableDocument wd = new 
WritableDocument(pd.runtime, null, serial, out);
    +                    wd.write(node);
    +                });
    +
    +                outputFF = session.putAttribute(outputFF, 
FRAGMENT_INDEX.key(), String.valueOf(fragmentCount++));
    +                outputs.add(new SimpleEntry(rel, outputFF));
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        PipelineXmlData pd = null;
    +        try {
    +            pd = pipelinePool.take();
    +        } catch (InterruptedException e) {
    +            return;
    --- End diff --
    
    I suspect instead of returning, this should call 
Thread.currentThread().interrupt()


> Add a processor to support XProc XML Pipelines
> ----------------------------------------------
>
>                 Key: NIFI-3572
>                 URL: https://issues.apache.org/jira/browse/NIFI-3572
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Steve Lawrence
>
> An XProc processor was developed and submitted to the NiFi dev list:
> https://mail-archives.apache.org/mod_mbox/nifi-dev/201703.mbox/%[email protected]%3E
> It would be nice if this could be pulled into NiFi.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to