[ 
https://issues.apache.org/jira/browse/NIFI-3572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15926196#comment-15926196
 ] 

ASF GitHub Bot commented on NIFI-3572:
--------------------------------------

Github user stevedlawrence commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1575#discussion_r106168512
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PipelineXml.java
 ---
    @@ -0,0 +1,451 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.commons.io.IOUtils;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.DynamicRelationship;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.StringUtils;
    +
    +import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
    +import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
    +import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
    +import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
    +import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
    +
    +import java.io.InputStream;
    +import java.io.IOException;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.AbstractMap.SimpleEntry;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.xml.sax.InputSource;
    +import javax.xml.transform.sax.SAXSource;
    +
    +import net.sf.saxon.s9api.DocumentBuilder;
    +import net.sf.saxon.s9api.QName;
    +import net.sf.saxon.s9api.SaxonApiException;
    +import net.sf.saxon.s9api.XdmNode;
    +
    +import com.xmlcalabash.core.XProcException;
    +import com.xmlcalabash.io.ReadablePipe;
    +import com.xmlcalabash.io.WritableDocument;
    +import com.xmlcalabash.model.RuntimeValue;
    +import com.xmlcalabash.model.Serialization;
    +import com.xmlcalabash.runtime.XPipeline;
    +import com.xmlcalabash.util.Input;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"XML, XProc, XMLCalabash"})
    +@CapabilityDescription(
    +    "Inserts a FlowFile into a specified XProc XML pipeline, allowing one 
to perform " +
    +    "complex validations and transformations of XML data within a single 
NiFi " +
    +    "processor. This processor provides a FlowFile to the primary input 
port of the " +
    +    "XProc pipeline. It is an error if the XProc pipeline does not define 
a primary " +
    +    "input port. When data exits the XProc pipeline via one or more output 
ports, a " +
    +    "FlowFile is created and transferred to a dynamic NiFi relationship 
having the " +
    +    "same name as the output port. If a failure occurs during XProc 
processing, the " +
    +    "original FlowFile is transferred to the 'pipeline failure' 
relationship and " +
    +    "nothing transferred to any of the the dynamic output port 
relationships. " +
    +    "Dynamic properties may be defined, with their names and values passed 
to the " +
    +    "XProc pipeline as XProc options. Note that all input and output XML 
data " +
    +    "reside in memory during XML pipeline processing. If memory usage is a 
" +
    +    "concern, the XML Pipeline Pool Size property could be used to limit 
the " +
    +    "amount of XML pipelines that run at a single time to help limit this 
memory " +
    +    "usage. For more information on XProc, visit 
http://www.w3.org/TR/xproc/";)
    +@DynamicProperty(
    +    name = "XProc Option Name",
    +    value = "XProc Option Value",
    +    supportsExpressionLanguage = true,
    +    description = "Option names and values passed to the XProc pipeline. 
The dynamic " +
    +        "property name must be in Clark-notation, {uri}name, though the 
{uri} " +
    +        "prefix may be optional depending on the XProc file. The property 
name " +
    +        "is passed directly to the XProc engine as an option name along 
with its " +
    +        "associated value.")
    +@DynamicRelationship(
    +    name = "XProc Output Port",
    +    description =
    +        "A dynamic relationship is created for each output port defined in 
the XProc " +
    +        "pipeline. When XML is written to an XProc output port, a FlowFile 
is created " +
    +        "for the XML, which is transferred to relationship of the same 
name. Based on " +
    +        "the XProc pipeline, a single input FlowFile could result in 
outputs to more " +
    +        "than one relationship. If an XProc output port specifies 
sequence='true', then " +
    +        "multiple FlowFiles could be transferred to the same output 
relationship for a " +
    +        "single input FlowFile.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "fragment.identifier",
    +        description = "All outputs produced from the same input FlowFile 
will have the same randomly generated UUID added for this attribute"),
    +    @WritesAttribute(attribute = "fragment.index",
    +        description = "A one-up number that indicates the ordering of 
output port FlowFiles that were created from a single parent FlowFile"),
    +    @WritesAttribute(attribute = "fragment.count",
    +        description = "The total number of FlowFiles generated from the 
input FlowFile"),
    +    @WritesAttribute(attribute = "segment.original.filename ",
    +        description = "The filename of the input FlowFile")
    +})
    +public class PipelineXml extends AbstractProcessor {
    +
    +    private String inputPort = null;
    +    private List<PropertyDescriptor> properties;
    +    private AtomicReference<Set<Relationship>> relationships = new 
AtomicReference<>();
    +    private BlockingQueue<PipelineXmlData> pipelinePool = null;
    +
    +    private static String findPrimaryInputPort(XPipeline pipeline) {
    +        for (String port : pipeline.getInputs()) {
    +            if (pipeline.getDeclareStep().getInput(port).getPrimary() &&
    +               
!pipeline.getDeclareStep().getInput(port).getParameterInput()) {
    +                return port;
    +            }
    +        }
    +        return null;
    +    }
    +
    +    private static final Validator XML_PIPELINE_VALIDATOR = new 
Validator() {
    +        @Override
    +        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
    +            final ValidationResult.Builder builder = new 
ValidationResult.Builder();
    +            builder.subject(subject).input(input);
    +
    +            PipelineXmlData pd = null;
    +            try {
    +                if (subject == XML_PIPELINE_CONFIG.getName()) {
    +                    pd = new PipelineXmlData(input);
    +                } else if (subject == XML_PIPELINE_FILE.getName()) {
    +                    pd = new PipelineXmlData(new Input(input));
    +                } else {
    +                    return builder.valid(false).explanation("Can only 
validate XML Pipeline Config or XML Pipeline File").build();
    +                }
    +            } catch (SaxonApiException | XProcException e) {
    +                return builder.valid(false).explanation("XProc pipeline is 
invalid: " + e).build();
    +            }
    +
    +            final String inputPort = findPrimaryInputPort(pd.pipeline);
    +            if (inputPort == null) {
    +                return builder.valid(false).explanation("XProc pipeline 
must define a primary non-parameter input port").build();
    +            }
    +
    +            return builder.valid(true).build();
    +        }
    +    };
    +
    +    private static final Validator XPROC_OPTION_NAME_VALIDATOR = new 
Validator() {
    +        @Override
    +        public ValidationResult validate(String optName, String optValue, 
ValidationContext context) {
    +            final ValidationResult.Builder builder = new 
ValidationResult.Builder();
    +            builder.subject(optName).input(optValue);
    +
    +            try {
    +                QName.fromClarkName(optName);
    +                builder.valid(true);
    +            } catch (IllegalArgumentException e) {
    +                builder.valid(false).explanation("Option name must be of 
the form {uri}local.");
    +            }
    +
    +            return builder.build();
    +        }
    +    };
    +
    +    // note that the names of these properties and relationships 
intentionally
    +    // have spaces so they cannot conflict with XProc option and output 
port
    +    // names, which are restricted to QName's and NCName's
    +    public static final PropertyDescriptor XML_PIPELINE_FILE = new 
PropertyDescriptor.Builder()
    +            .name("xml pipeline file")
    +            .displayName("XML Pipeline File")
    +            .description("Full path to a file containing the XProc XML 
pipeline configuration. Only one of XML Pipeline File or XML Pipeline Config 
may be used.")
    +            .required(false)
    +            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +            .addValidator(XML_PIPELINE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor XML_PIPELINE_CONFIG = new 
PropertyDescriptor.Builder()
    +            .name("xml pipeline config")
    +            .displayName("XML Pipeline Config")
    +            .description("XProc XML pipeline configuration. Only one of 
XML Pipeline File or XML Pipeline Config may be used.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .addValidator(XML_PIPELINE_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor XML_PIPELINE_POOL_SIZE = new 
PropertyDescriptor.Builder()
    --- End diff --
    
    A new commit was added to this pull request to make this change.


> Add a processor to support XProc XML Pipelines
> ----------------------------------------------
>
>                 Key: NIFI-3572
>                 URL: https://issues.apache.org/jira/browse/NIFI-3572
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Steve Lawrence
>
> An XProc processor was developed and submitted to the NiFi dev list:
> https://mail-archives.apache.org/mod_mbox/nifi-dev/201703.mbox/%[email protected]%3E
> It would be nice if this could be pulled into NiFi.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to