Github user stevedlawrence commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1575#discussion_r106164474
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PipelineXml.java
---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.xml.sax.InputSource;
+import javax.xml.transform.sax.SAXSource;
+
+import net.sf.saxon.s9api.DocumentBuilder;
+import net.sf.saxon.s9api.QName;
+import net.sf.saxon.s9api.SaxonApiException;
+import net.sf.saxon.s9api.XdmNode;
+
+import com.xmlcalabash.core.XProcException;
+import com.xmlcalabash.io.ReadablePipe;
+import com.xmlcalabash.io.WritableDocument;
+import com.xmlcalabash.model.RuntimeValue;
+import com.xmlcalabash.model.Serialization;
+import com.xmlcalabash.runtime.XPipeline;
+import com.xmlcalabash.util.Input;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"XML, XProc, XMLCalabash"})
+@CapabilityDescription(
+ "Inserts a FlowFile into a specified XProc XML pipeline, allowing one
to perform " +
+ "complex validations and transformations of XML data within a single
NiFi " +
+ "processor. This processor provides a FlowFile to the primary input
port of the " +
+ "XProc pipeline. It is an error if the XProc pipeline does not define
a primary " +
+ "input port. When data exits the XProc pipeline via one or more output
ports, a " +
+ "FlowFile is created and transferred to a dynamic NiFi relationship
having the " +
+ "same name as the output port. If a failure occurs during XProc
processing, the " +
+ "original FlowFile is transferred to the 'pipeline failure'
relationship and " +
+ "nothing transferred to any of the the dynamic output port
relationships. " +
+ "Dynamic properties may be defined, with their names and values passed
to the " +
+ "XProc pipeline as XProc options. Note that all input and output XML
data " +
+ "reside in memory during XML pipeline processing. If memory usage is a
" +
+ "concern, the XML Pipeline Pool Size property could be used to limit
the " +
+ "amount of XML pipelines that run at a single time to help limit this
memory " +
+ "usage. For more information on XProc, visit
http://www.w3.org/TR/xproc/")
+@DynamicProperty(
+ name = "XProc Option Name",
+ value = "XProc Option Value",
+ supportsExpressionLanguage = true,
+ description = "Option names and values passed to the XProc pipeline.
The dynamic " +
+ "property name must be in Clark-notation, {uri}name, though the
{uri} " +
+ "prefix may be optional depending on the XProc file. The property
name " +
+ "is passed directly to the XProc engine as an option name along
with its " +
+ "associated value.")
+@DynamicRelationship(
+ name = "XProc Output Port",
+ description =
+ "A dynamic relationship is created for each output port defined in
the XProc " +
+ "pipeline. When XML is written to an XProc output port, a FlowFile
is created " +
+ "for the XML, which is transferred to relationship of the same
name. Based on " +
+ "the XProc pipeline, a single input FlowFile could result in
outputs to more " +
+ "than one relationship. If an XProc output port specifies
sequence='true', then " +
+ "multiple FlowFiles could be transferred to the same output
relationship for a " +
+ "single input FlowFile.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "fragment.identifier",
+ description = "All outputs produced from the same input FlowFile
will have the same randomly generated UUID added for this attribute"),
+ @WritesAttribute(attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of
output port FlowFiles that were created from a single parent FlowFile"),
+ @WritesAttribute(attribute = "fragment.count",
+ description = "The total number of FlowFiles generated from the
input FlowFile"),
+ @WritesAttribute(attribute = "segment.original.filename ",
+ description = "The filename of the input FlowFile")
+})
+public class PipelineXml extends AbstractProcessor {
+
+ private String inputPort = null;
+ private List<PropertyDescriptor> properties;
+ private AtomicReference<Set<Relationship>> relationships = new
AtomicReference<>();
+ private BlockingQueue<PipelineXmlData> pipelinePool = null;
+
+ private static String findPrimaryInputPort(XPipeline pipeline) {
+ for (String port : pipeline.getInputs()) {
+ if (pipeline.getDeclareStep().getInput(port).getPrimary() &&
+
!pipeline.getDeclareStep().getInput(port).getParameterInput()) {
+ return port;
+ }
+ }
+ return null;
+ }
+
+ private static final Validator XML_PIPELINE_VALIDATOR = new
Validator() {
+ @Override
+ public ValidationResult validate(String subject, String input,
ValidationContext context) {
+ final ValidationResult.Builder builder = new
ValidationResult.Builder();
+ builder.subject(subject).input(input);
+
+ PipelineXmlData pd = null;
+ try {
+ if (subject == XML_PIPELINE_CONFIG.getName()) {
+ pd = new PipelineXmlData(input);
+ } else if (subject == XML_PIPELINE_FILE.getName()) {
+ pd = new PipelineXmlData(new Input(input));
+ } else {
+ return builder.valid(false).explanation("Can only
validate XML Pipeline Config or XML Pipeline File").build();
+ }
+ } catch (SaxonApiException | XProcException e) {
+ return builder.valid(false).explanation("XProc pipeline is
invalid: " + e).build();
+ }
+
+ final String inputPort = findPrimaryInputPort(pd.pipeline);
+ if (inputPort == null) {
+ return builder.valid(false).explanation("XProc pipeline
must define a primary non-parameter input port").build();
+ }
+
+ return builder.valid(true).build();
+ }
+ };
+
+ private static final Validator XPROC_OPTION_NAME_VALIDATOR = new
Validator() {
+ @Override
+ public ValidationResult validate(String optName, String optValue,
ValidationContext context) {
+ final ValidationResult.Builder builder = new
ValidationResult.Builder();
+ builder.subject(optName).input(optValue);
+
+ try {
+ QName.fromClarkName(optName);
+ builder.valid(true);
+ } catch (IllegalArgumentException e) {
+ builder.valid(false).explanation("Option name must be of
the form {uri}local.");
+ }
+
+ return builder.build();
+ }
+ };
+
+ // note that the names of these properties and relationships
intentionally
+ // have spaces so they cannot conflict with XProc option and output
port
+ // names, which are restricted to QName's and NCName's
+ public static final PropertyDescriptor XML_PIPELINE_FILE = new
PropertyDescriptor.Builder()
+ .name("xml pipeline file")
+ .displayName("XML Pipeline File")
+ .description("Full path to a file containing the XProc XML
pipeline configuration. Only one of XML Pipeline File or XML Pipeline Config
may be used.")
+ .required(false)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .addValidator(XML_PIPELINE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor XML_PIPELINE_CONFIG = new
PropertyDescriptor.Builder()
+ .name("xml pipeline config")
+ .displayName("XML Pipeline Config")
+ .description("XProc XML pipeline configuration. Only one of
XML Pipeline File or XML Pipeline Config may be used.")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .addValidator(XML_PIPELINE_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor XML_PIPELINE_POOL_SIZE = new
PropertyDescriptor.Builder()
--- End diff --
This pool size configuration option is really not necessary. Any size over
the number of concurrent tasks will lead to pool instances never being used and
just wasting memory. And any size under the number of concurrent tasks will
effectively limit how many concurrent tasks can run at once. So instead of
making this configurable, the pool size should just be initialized to the
maximum number of concurrent tasks.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---