[
https://issues.apache.org/jira/browse/NIFI-3572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15926196#comment-15926196
]
ASF GitHub Bot commented on NIFI-3572:
--------------------------------------
Github user stevedlawrence commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1575#discussion_r106168512
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PipelineXml.java
---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.xml.sax.InputSource;
+import javax.xml.transform.sax.SAXSource;
+
+import net.sf.saxon.s9api.DocumentBuilder;
+import net.sf.saxon.s9api.QName;
+import net.sf.saxon.s9api.SaxonApiException;
+import net.sf.saxon.s9api.XdmNode;
+
+import com.xmlcalabash.core.XProcException;
+import com.xmlcalabash.io.ReadablePipe;
+import com.xmlcalabash.io.WritableDocument;
+import com.xmlcalabash.model.RuntimeValue;
+import com.xmlcalabash.model.Serialization;
+import com.xmlcalabash.runtime.XPipeline;
+import com.xmlcalabash.util.Input;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"XML, XProc, XMLCalabash"})
+@CapabilityDescription(
+ "Inserts a FlowFile into a specified XProc XML pipeline, allowing one
to perform " +
+ "complex validations and transformations of XML data within a single
NiFi " +
+ "processor. This processor provides a FlowFile to the primary input
port of the " +
+ "XProc pipeline. It is an error if the XProc pipeline does not define
a primary " +
+ "input port. When data exits the XProc pipeline via one or more output
ports, a " +
+ "FlowFile is created and transferred to a dynamic NiFi relationship
having the " +
+ "same name as the output port. If a failure occurs during XProc
processing, the " +
+ "original FlowFile is transferred to the 'pipeline failure'
relationship and " +
+ "nothing transferred to any of the the dynamic output port
relationships. " +
+ "Dynamic properties may be defined, with their names and values passed
to the " +
+ "XProc pipeline as XProc options. Note that all input and output XML
data " +
+ "reside in memory during XML pipeline processing. If memory usage is a
" +
+ "concern, the XML Pipeline Pool Size property could be used to limit
the " +
+ "amount of XML pipelines that run at a single time to help limit this
memory " +
+ "usage. For more information on XProc, visit
http://www.w3.org/TR/xproc/")
+@DynamicProperty(
+ name = "XProc Option Name",
+ value = "XProc Option Value",
+ supportsExpressionLanguage = true,
+ description = "Option names and values passed to the XProc pipeline.
The dynamic " +
+ "property name must be in Clark-notation, {uri}name, though the
{uri} " +
+ "prefix may be optional depending on the XProc file. The property
name " +
+ "is passed directly to the XProc engine as an option name along
with its " +
+ "associated value.")
+@DynamicRelationship(
+ name = "XProc Output Port",
+ description =
+ "A dynamic relationship is created for each output port defined in
the XProc " +
+ "pipeline. When XML is written to an XProc output port, a FlowFile
is created " +
+ "for the XML, which is transferred to relationship of the same
name. Based on " +
+ "the XProc pipeline, a single input FlowFile could result in
outputs to more " +
+ "than one relationship. If an XProc output port specifies
sequence='true', then " +
+ "multiple FlowFiles could be transferred to the same output
relationship for a " +
+ "single input FlowFile.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "fragment.identifier",
+ description = "All outputs produced from the same input FlowFile
will have the same randomly generated UUID added for this attribute"),
+ @WritesAttribute(attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of
output port FlowFiles that were created from a single parent FlowFile"),
+ @WritesAttribute(attribute = "fragment.count",
+ description = "The total number of FlowFiles generated from the
input FlowFile"),
+ @WritesAttribute(attribute = "segment.original.filename ",
+ description = "The filename of the input FlowFile")
+})
+public class PipelineXml extends AbstractProcessor {
+
+ private String inputPort = null;
+ private List<PropertyDescriptor> properties;
+ private AtomicReference<Set<Relationship>> relationships = new
AtomicReference<>();
+ private BlockingQueue<PipelineXmlData> pipelinePool = null;
+
+ private static String findPrimaryInputPort(XPipeline pipeline) {
+ for (String port : pipeline.getInputs()) {
+ if (pipeline.getDeclareStep().getInput(port).getPrimary() &&
+
!pipeline.getDeclareStep().getInput(port).getParameterInput()) {
+ return port;
+ }
+ }
+ return null;
+ }
+
+ private static final Validator XML_PIPELINE_VALIDATOR = new
Validator() {
+ @Override
+ public ValidationResult validate(String subject, String input,
ValidationContext context) {
+ final ValidationResult.Builder builder = new
ValidationResult.Builder();
+ builder.subject(subject).input(input);
+
+ PipelineXmlData pd = null;
+ try {
+ if (subject == XML_PIPELINE_CONFIG.getName()) {
+ pd = new PipelineXmlData(input);
+ } else if (subject == XML_PIPELINE_FILE.getName()) {
+ pd = new PipelineXmlData(new Input(input));
+ } else {
+ return builder.valid(false).explanation("Can only
validate XML Pipeline Config or XML Pipeline File").build();
+ }
+ } catch (SaxonApiException | XProcException e) {
+ return builder.valid(false).explanation("XProc pipeline is
invalid: " + e).build();
+ }
+
+ final String inputPort = findPrimaryInputPort(pd.pipeline);
+ if (inputPort == null) {
+ return builder.valid(false).explanation("XProc pipeline
must define a primary non-parameter input port").build();
+ }
+
+ return builder.valid(true).build();
+ }
+ };
+
+ private static final Validator XPROC_OPTION_NAME_VALIDATOR = new
Validator() {
+ @Override
+ public ValidationResult validate(String optName, String optValue,
ValidationContext context) {
+ final ValidationResult.Builder builder = new
ValidationResult.Builder();
+ builder.subject(optName).input(optValue);
+
+ try {
+ QName.fromClarkName(optName);
+ builder.valid(true);
+ } catch (IllegalArgumentException e) {
+ builder.valid(false).explanation("Option name must be of
the form {uri}local.");
+ }
+
+ return builder.build();
+ }
+ };
+
+ // note that the names of these properties and relationships
intentionally
+ // have spaces so they cannot conflict with XProc option and output
port
+ // names, which are restricted to QName's and NCName's
+ public static final PropertyDescriptor XML_PIPELINE_FILE = new
PropertyDescriptor.Builder()
+ .name("xml pipeline file")
+ .displayName("XML Pipeline File")
+ .description("Full path to a file containing the XProc XML
pipeline configuration. Only one of XML Pipeline File or XML Pipeline Config
may be used.")
+ .required(false)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .addValidator(XML_PIPELINE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor XML_PIPELINE_CONFIG = new
PropertyDescriptor.Builder()
+ .name("xml pipeline config")
+ .displayName("XML Pipeline Config")
+ .description("XProc XML pipeline configuration. Only one of
XML Pipeline File or XML Pipeline Config may be used.")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .addValidator(XML_PIPELINE_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor XML_PIPELINE_POOL_SIZE = new
PropertyDescriptor.Builder()
--- End diff --
A new commit was added to this pull request to make this change.
> Add a processor to support XProc XML Pipelines
> ----------------------------------------------
>
> Key: NIFI-3572
> URL: https://issues.apache.org/jira/browse/NIFI-3572
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Steve Lawrence
>
> An XProc processor was developed and submitted to the NiFi dev list:
> https://mail-archives.apache.org/mod_mbox/nifi-dev/201703.mbox/%[email protected]%3E
> It would be nice if this could be pulled into NiFi.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)