[
https://issues.apache.org/jira/browse/NIFI-3572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928175#comment-15928175
]
ASF GitHub Bot commented on NIFI-3572:
--------------------------------------
Github user stevedlawrence commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1575#discussion_r106432521
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PipelineXml.java
---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.xml.sax.InputSource;
+import javax.xml.transform.sax.SAXSource;
+
+import net.sf.saxon.s9api.DocumentBuilder;
+import net.sf.saxon.s9api.QName;
+import net.sf.saxon.s9api.SaxonApiException;
+import net.sf.saxon.s9api.XdmNode;
+
+import com.xmlcalabash.core.XProcException;
+import com.xmlcalabash.io.ReadablePipe;
+import com.xmlcalabash.io.WritableDocument;
+import com.xmlcalabash.model.RuntimeValue;
+import com.xmlcalabash.model.Serialization;
+import com.xmlcalabash.runtime.XPipeline;
+import com.xmlcalabash.util.Input;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"XML, XProc, XMLCalabash"})
+@CapabilityDescription(
+ "Inserts a FlowFile into a specified XProc XML pipeline, allowing one
to perform " +
+ "complex validations and transformations of XML data within a single
NiFi " +
+ "processor. This processor provides a FlowFile to the primary input
port of the " +
+ "XProc pipeline. It is an error if the XProc pipeline does not define
a primary " +
+ "input port. When data exits the XProc pipeline via one or more output
ports, a " +
+ "FlowFile is created and transferred to a dynamic NiFi relationship
having the " +
+ "same name as the output port. If a failure occurs during XProc
processing, the " +
+ "original FlowFile is transferred to the 'pipeline failure'
relationship and " +
+ "nothing transferred to any of the the dynamic output port
relationships. " +
+ "Dynamic properties may be defined, with their names and values passed
to the " +
+ "XProc pipeline as XProc options. Note that all input and output XML
data " +
+ "reside in memory during XML pipeline processing. If memory usage is a
" +
+ "concern, the XML Pipeline Pool Size property could be used to limit
the " +
+ "amount of XML pipelines that run at a single time to help limit this
memory " +
+ "usage. For more information on XProc, visit
http://www.w3.org/TR/xproc/")
+@DynamicProperty(
+ name = "XProc Option Name",
+ value = "XProc Option Value",
+ supportsExpressionLanguage = true,
+ description = "Option names and values passed to the XProc pipeline.
The dynamic " +
+ "property name must be in Clark-notation, {uri}name, though the
{uri} " +
+ "prefix may be optional depending on the XProc file. The property
name " +
+ "is passed directly to the XProc engine as an option name along
with its " +
+ "associated value.")
+@DynamicRelationship(
+ name = "XProc Output Port",
+ description =
+ "A dynamic relationship is created for each output port defined in
the XProc " +
+ "pipeline. When XML is written to an XProc output port, a FlowFile
is created " +
+ "for the XML, which is transferred to relationship of the same
name. Based on " +
+ "the XProc pipeline, a single input FlowFile could result in
outputs to more " +
+ "than one relationship. If an XProc output port specifies
sequence='true', then " +
+ "multiple FlowFiles could be transferred to the same output
relationship for a " +
+ "single input FlowFile.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "fragment.identifier",
+ description = "All outputs produced from the same input FlowFile
will have the same randomly generated UUID added for this attribute"),
+ @WritesAttribute(attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of
output port FlowFiles that were created from a single parent FlowFile"),
+ @WritesAttribute(attribute = "fragment.count",
+ description = "The total number of FlowFiles generated from the
input FlowFile"),
+ @WritesAttribute(attribute = "segment.original.filename ",
+ description = "The filename of the input FlowFile")
+})
+public class PipelineXml extends AbstractProcessor {
+
+ private String inputPort = null;
+ private List<PropertyDescriptor> properties;
+ private AtomicReference<Set<Relationship>> relationships = new
AtomicReference<>();
+ private BlockingQueue<PipelineXmlData> pipelinePool = null;
+
+ private static String findPrimaryInputPort(XPipeline pipeline) {
+ for (String port : pipeline.getInputs()) {
+ if (pipeline.getDeclareStep().getInput(port).getPrimary() &&
+
!pipeline.getDeclareStep().getInput(port).getParameterInput()) {
+ return port;
+ }
+ }
+ return null;
+ }
+
+ private static final Validator XML_PIPELINE_VALIDATOR = new
Validator() {
+ @Override
+ public ValidationResult validate(String subject, String input,
ValidationContext context) {
+ final ValidationResult.Builder builder = new
ValidationResult.Builder();
+ builder.subject(subject).input(input);
+
+ PipelineXmlData pd = null;
+ try {
+ if (subject == XML_PIPELINE_CONFIG.getName()) {
+ pd = new PipelineXmlData(input);
+ } else if (subject == XML_PIPELINE_FILE.getName()) {
+ pd = new PipelineXmlData(new Input(input));
+ } else {
+ return builder.valid(false).explanation("Can only
validate XML Pipeline Config or XML Pipeline File").build();
+ }
+ } catch (SaxonApiException | XProcException e) {
+ return builder.valid(false).explanation("XProc pipeline is
invalid: " + e).build();
+ }
+
+ final String inputPort = findPrimaryInputPort(pd.pipeline);
+ if (inputPort == null) {
+ return builder.valid(false).explanation("XProc pipeline
must define a primary non-parameter input port").build();
+ }
+
+ return builder.valid(true).build();
+ }
+ };
+
+ private static final Validator XPROC_OPTION_NAME_VALIDATOR = new
Validator() {
+ @Override
+ public ValidationResult validate(String optName, String optValue,
ValidationContext context) {
+ final ValidationResult.Builder builder = new
ValidationResult.Builder();
+ builder.subject(optName).input(optValue);
+
+ try {
+ QName.fromClarkName(optName);
+ builder.valid(true);
+ } catch (IllegalArgumentException e) {
+ builder.valid(false).explanation("Option name must be of
the form {uri}local.");
+ }
+
+ return builder.build();
+ }
+ };
+
+ // note that the names of these properties and relationships
intentionally
+ // have spaces so they cannot conflict with XProc option and output
port
+ // names, which are restricted to QName's and NCName's
+ public static final PropertyDescriptor XML_PIPELINE_FILE = new
PropertyDescriptor.Builder()
+ .name("xml pipeline file")
+ .displayName("XML Pipeline File")
+ .description("Full path to a file containing the XProc XML
pipeline configuration. Only one of XML Pipeline File or XML Pipeline Config
may be used.")
+ .required(false)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .addValidator(XML_PIPELINE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor XML_PIPELINE_CONFIG = new
PropertyDescriptor.Builder()
+ .name("xml pipeline config")
+ .displayName("XML Pipeline Config")
+ .description("XProc XML pipeline configuration. Only one of
XML Pipeline File or XML Pipeline Config may be used.")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .addValidator(XML_PIPELINE_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor XML_PIPELINE_POOL_SIZE = new
PropertyDescriptor.Builder()
+ .name("xml pipeline pool size")
+ .displayName("XML Pipeline Pool Size")
+ .description("The library used to run the XML pipeline is not
thread safe. To allow for processing multiple " +
+ "FlowFiles in separate threads, this processor creates a
pool of separate pipeline instances. This " +
+ "value defines the size of this pool and the number of
pipeline instances to create. Note that this " +
+ "value effectively defines the maximum number of FlowFiles
that can be processed at the same time in " +
+ "separate threads, regardless of the number of threads. A
higher number allows more parrallel " +
+ "pipeline processing, but at the expense of additional
memory used for each pipeline instance and the " +
+ "XML input/output data that traverses throughout the
pipeline.")
+ .required(true)
+ .defaultValue("10")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_PIPELINE_FAILURE = new
Relationship.Builder()
+ .name("pipeline failure")
+ .description("FlowFiles that fail XProc processing are routed
here")
+ .build();
+ public static final Relationship REL_PIPELINE_ORIGINAL = new
Relationship.Builder()
+ .name("original xml")
+ .description("FlowFiles that successfully complete XProc
processing are routed here")
+ .build();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(XML_PIPELINE_FILE);
+ properties.add(XML_PIPELINE_CONFIG);
+ properties.add(XML_PIPELINE_POOL_SIZE);
+ this.properties = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> set = new HashSet<>();
+ set.add(REL_PIPELINE_FAILURE);
+ set.add(REL_PIPELINE_ORIGINAL);
+ relationships = new AtomicReference<>(set);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships.get();
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .expressionLanguageSupported(true)
+ .required(false)
+ .addValidator(XPROC_OPTION_NAME_VALIDATOR)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
true))
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ Set<ValidationResult> results = new HashSet<>();
+
+ final Set<Relationship> newRelationships = new HashSet<>();
+ newRelationships.add(REL_PIPELINE_FAILURE);
+ newRelationships.add(REL_PIPELINE_ORIGINAL);
+
+ Map<PropertyDescriptor, String> propertyMap =
validationContext.getProperties();
+ final String pipelineFile = propertyMap.get(XML_PIPELINE_FILE);
+ final String pipelineConfig = propertyMap.get(XML_PIPELINE_CONFIG);
+
+ if (StringUtils.isEmpty(pipelineFile) ==
StringUtils.isEmpty(pipelineConfig)) {
+ results.add(new
ValidationResult.Builder().valid(false).explanation(
+ "Exactly one of XML Pipeline File or XML Pipeline Config
must be set").build());
+ this.relationships.set(newRelationships);
+ return results;
+ }
+
+ PipelineXmlData pd = null;
+ try {
+ if (!StringUtils.isEmpty(pipelineFile)) {
+ pd = new PipelineXmlData(new Input(pipelineFile));
+ } else {
+ pd = new PipelineXmlData(pipelineConfig);
+ }
+ } catch (Exception e) {
+ // shouldn't be possible, this should have been validated in
the individual validators
+ results.add(new
ValidationResult.Builder().valid(false).explanation(
+ "Failed to parse pipeline data: " + e).build());
+ this.relationships.set(newRelationships);
+ return results;
+ }
+
+ inputPort = findPrimaryInputPort(pd.pipeline);
+
+ // we know everything is valid, so lets add the new output port
relationships
+ final Set<String> outputPorts = pd.pipeline.getOutputs();
+ for (final String outputPort: outputPorts) {
+ final Relationship outputRel = new Relationship.Builder()
+ .name(outputPort)
+ .description("The XProc output port named '" + outputPort
+ "'")
+ .build();
+ newRelationships.add(outputRel);
+ }
+
+ this.relationships.set(newRelationships);
+
+ return results;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws
SaxonApiException {
+ final String pipelineFile =
context.getProperty(XML_PIPELINE_FILE).getValue();
+ final String pipelineConfig =
context.getProperty(XML_PIPELINE_CONFIG).getValue();
+ final int pipelinePoolSize =
context.getProperty(XML_PIPELINE_POOL_SIZE).asInteger();
+ pipelinePool = new ArrayBlockingQueue<>(pipelinePoolSize);
+ for (int i = 0; i < pipelinePoolSize; i++) {
+ final PipelineXmlData pd;
+ if (!StringUtils.isEmpty(pipelineFile)) {
+ pd = new PipelineXmlData(new Input(pipelineFile));
+ } else {
+ pd = new PipelineXmlData(pipelineConfig);
+ }
+ pipelinePool.add(pd);
+ }
+ }
+
+ @OnStopped
+ public void onStopped(final ProcessContext context) {
+ pipelinePool.clear();
+ pipelinePool = null;
+ }
+
+ private void handleInput(PipelineXmlData pd, ProcessContext context,
FlowFile original, InputStream stream) throws SaxonApiException {
+ XdmNode inputNode = pd.runtime.parse(new InputSource(stream));
+ pd.pipeline.writeTo(inputPort, inputNode);
+
+ for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
+ if (!entry.getKey().isDynamic()) {
+ continue;
+ }
+ final QName optName =
QName.fromClarkName(entry.getKey().getName());
+ final String value =
context.newPropertyValue(entry.getValue()).evaluateAttributeExpressions(original).getValue();
+ pd.pipeline.passOption(optName, new RuntimeValue(value));
+ }
+ }
+
+ private void handleOutput(PipelineXmlData pd, ProcessSession session,
FlowFile original, List<Map.Entry<Relationship, FlowFile>> outputs) throws
SaxonApiException {
+ int fragmentCount = 0;
+
+ for (final Relationship rel: getRelationships()) {
+ if (rel == REL_PIPELINE_FAILURE || rel ==
REL_PIPELINE_ORIGINAL) {
+ continue;
+ }
+
+ final ReadablePipe rpipe = pd.pipeline.readFrom(rel.getName());
+ final Serialization serial =
pd.pipeline.getSerialization(rel.getName());
+
+ while (rpipe.moreDocuments()) {
+ final XdmNode node = rpipe.read();
+
+ FlowFile outputFF = session.create(original);
+ outputFF = session.write(outputFF, out -> {
+ final WritableDocument wd = new
WritableDocument(pd.runtime, null, serial, out);
+ wd.write(node);
+ });
+
+ outputFF = session.putAttribute(outputFF,
FRAGMENT_INDEX.key(), String.valueOf(fragmentCount++));
+ outputs.add(new SimpleEntry(rel, outputFF));
+ }
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ PipelineXmlData pd = null;
+ try {
+ pd = pipelinePool.take();
+ } catch (InterruptedException e) {
+ return;
--- End diff --
I suspect instead of returning, this should call
Thread.currentThread().interrupt()
> Add a processor to support XProc XML Pipelines
> ----------------------------------------------
>
> Key: NIFI-3572
> URL: https://issues.apache.org/jira/browse/NIFI-3572
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Steve Lawrence
>
> An XProc processor was developed and submitted to the NiFi dev list:
> https://mail-archives.apache.org/mod_mbox/nifi-dev/201703.mbox/%[email protected]%3E
> It would be nice if this could be pulled into NiFi.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)