This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2049d5068b NIFI-11524 Improved ExecuteStreamCommand documentation and
configuration
2049d5068b is described below
commit 2049d5068b8238fc9251d3e735f1d80e0c86558b
Author: Nandor Soma Abonyi <[email protected]>
AuthorDate: Thu May 4 10:52:11 2023 +0200
NIFI-11524 Improved ExecuteStreamCommand documentation and configuration
This closes #7228
Signed-off-by: David Handermann <[email protected]>
---
.../processors/standard/ExecuteStreamCommand.java | 127 +++++++++-------
.../additionalDetails.html | 166 +++++++++++++++++++++
2 files changed, 236 insertions(+), 57 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index ca7dc62971..01a2fb90d0 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@ -65,6 +65,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ProcessBuilder.Redirect;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -148,14 +149,15 @@ import java.util.regex.Pattern;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"command execution", "command", "stream", "execute"})
-@CapabilityDescription("Executes an external command on the contents of a flow
file, and creates a new flow file with the results of the command.")
+@CapabilityDescription("The ExecuteStreamCommand processor provides a flexible
way to integrate external commands and scripts into NiFi data flows."
+ + " ExecuteStreamCommand can pass the incoming FlowFile's content to
the command that it executes similarly how piping works.")
@SupportsSensitiveDynamicProperties
@DynamicProperties({
@DynamicProperty(name = "An environment variable name", value = "An
environment variable value",
description = "These environment variables are passed to the
process spawned by this Processor"),
- @DynamicProperty(name = "command.argument.<NUMBER>", value = "Argument
to be supplied to the command",
+ @DynamicProperty(name = "command.argument.<commandIndex>", value =
"Argument to be supplied to the command",
description = "These arguments are supplied to the process
spawned by this Processor when using the "
- + "Command Arguments Strategy : Dynamic Property
Arguments. The NUMBER will determine the order.")
+ + "Command Arguments Strategy : Dynamic Property
Arguments. <commandIndex> is a number and it will determine the order.")
})
@WritesAttributes({
@WritesAttribute(attribute = "execution.command", description = "The
name of the command executed"),
@@ -189,7 +191,7 @@ public class ExecuteStreamCommand extends AbstractProcessor
{
private final static Set<Relationship> OUTPUT_STREAM_RELATIONSHIP_SET;
private final static Set<Relationship> ATTRIBUTE_RELATIONSHIP_SET;
- private static final Pattern DYNAMIC_PARAMETER_NAME =
Pattern.compile("command\\.argument\\.(?<commandIndex>[0-9]+)$");
+ private static final Pattern COMMAND_ARGUMENT_PATTERN =
Pattern.compile("command\\.argument\\.(?<commandIndex>[0-9]+)$");
public static final String executionArguments = "Command Arguments
Property";
public static final String dynamicArguements = "Dynamic Property
Arguments";
@@ -197,8 +199,15 @@ public class ExecuteStreamCommand extends
AbstractProcessor {
"Arguments to be supplied to the executable are taken from the
Command Arguments property");
static final AllowableValue DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY = new
AllowableValue(dynamicArguements, dynamicArguements,
- "Arguments to be supplied to the executable are taken from dynamic
properties");
+ "Arguments to be supplied to the executable are taken from dynamic
properties with pattern of 'command.argument.<commandIndex>'");
+ static final PropertyDescriptor WORKING_DIR = new
PropertyDescriptor.Builder()
+ .name("Working Directory")
+ .description("The directory to use as the current working
directory when executing the command")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+
.addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
+ .required(false)
+ .build();
private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR =
StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING,
true);
static final PropertyDescriptor EXECUTION_COMMAND = new
PropertyDescriptor.Builder()
@@ -215,38 +224,36 @@ public class ExecuteStreamCommand extends
AbstractProcessor {
.description("Strategy for configuring arguments to be supplied to
the command.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
- .allowableValues(EXECUTION_ARGUMENTS_PROPERTY_STRATEGY.getValue(),
DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue())
+ .allowableValues(EXECUTION_ARGUMENTS_PROPERTY_STRATEGY,
DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY)
.defaultValue(EXECUTION_ARGUMENTS_PROPERTY_STRATEGY.getValue())
.build();
static final PropertyDescriptor EXECUTION_ARGUMENTS = new
PropertyDescriptor.Builder()
.name("Command Arguments")
.description("The arguments to supply to the executable delimited
by the ';' character.")
+ .dependsOn(ARGUMENTS_STRATEGY,
EXECUTION_ARGUMENTS_PROPERTY_STRATEGY)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(new Validator() {
-
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- ValidationResult result = new ValidationResult.Builder()
- .subject(subject).valid(true).input(input).build();
- List<String> args = ArgumentUtils.splitArgs(input,
context.getProperty(ARG_DELIMITER).getValue().charAt(0));
- for (String arg : args) {
- ValidationResult valResult =
ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context);
- if (!valResult.isValid()) {
- result = valResult;
- break;
- }
+ .addValidator((subject, input, context) -> {
+ ValidationResult result = new ValidationResult.Builder()
+ .subject(subject).valid(true).input(input).build();
+ List<String> args = ArgumentUtils.splitArgs(input,
context.getProperty(ExecuteStreamCommand.ARG_DELIMITER).getValue().charAt(0));
+ for (String arg : args) {
+ ValidationResult valResult =
ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context);
+ if (!valResult.isValid()) {
+ result = valResult;
+ break;
}
- return result;
}
+ return result;
}).build();
- static final PropertyDescriptor WORKING_DIR = new
PropertyDescriptor.Builder()
- .name("Working Directory")
- .description("The directory to use as the current working
directory when executing the command")
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-
.addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
- .required(false)
+ static final PropertyDescriptor ARG_DELIMITER = new
PropertyDescriptor.Builder()
+ .name("Argument Delimiter")
+ .description("Delimiter to use to separate arguments for a command
[default: ;]. Must be a single character")
+ .dependsOn(ARGUMENTS_STRATEGY,
EXECUTION_ARGUMENTS_PROPERTY_STRATEGY)
+ .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
+ .required(true)
+ .defaultValue(";")
.build();
static final PropertyDescriptor IGNORE_STDIN = new
PropertyDescriptor.Builder()
@@ -273,33 +280,21 @@ public class ExecuteStreamCommand extends
AbstractProcessor {
.defaultValue("256")
.build();
- private static final Validator characterValidator = new
StandardValidators.StringLengthValidator(1, 1);
-
- static final PropertyDescriptor ARG_DELIMITER = new
PropertyDescriptor.Builder()
- .name("Argument Delimiter")
- .description("Delimiter to use to separate arguments for a command
[default: ;]. Must be a single character")
- .addValidator(Validator.VALID)
- .addValidator(characterValidator)
- .required(true)
- .defaultValue(";")
- .build();
-
private static final List<PropertyDescriptor> PROPERTIES;
private static final String MASKED_ARGUMENT = "********";
static {
List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(WORKING_DIR);
+ props.add(EXECUTION_COMMAND);
props.add(ARGUMENTS_STRATEGY);
props.add(EXECUTION_ARGUMENTS);
- props.add(EXECUTION_COMMAND);
- props.add(IGNORE_STDIN);
- props.add(WORKING_DIR);
props.add(ARG_DELIMITER);
+ props.add(IGNORE_STDIN);
props.add(PUT_OUTPUT_IN_ATTRIBUTE);
props.add(PUT_ATTRIBUTE_MAX_LENGTH);
PROPERTIES = Collections.unmodifiableList(props);
-
Set<Relationship> outputStreamRelationships = new HashSet<>();
outputStreamRelationships.add(OUTPUT_STREAM_RELATIONSHIP);
outputStreamRelationships.add(ORIGINAL_RELATIONSHIP);
@@ -343,17 +338,7 @@ public class ExecuteStreamCommand extends
AbstractProcessor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
- if (!propertyDescriptorName.startsWith("command.argument.")) {
- return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .description(
- "Sets the environment variable '" +
propertyDescriptorName + "' for the process' environment")
- .dynamic(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- }
- // get the number part of the name
- Matcher matcher =
DYNAMIC_PARAMETER_NAME.matcher(propertyDescriptorName);
+ final Matcher matcher =
COMMAND_ARGUMENT_PATTERN.matcher(propertyDescriptorName);
if (matcher.matches()) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
@@ -363,8 +348,37 @@ public class ExecuteStreamCommand extends
AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();
+ } else {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .description("Sets the environment variable '" +
propertyDescriptorName + "' for the process' environment")
+ .dynamic(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
}
- return null;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
+ final List<ValidationResult> validationResults = new
ArrayList<>(super.customValidate(validationContext));
+
+ final String argumentStrategy =
validationContext.getProperty(ARGUMENTS_STRATEGY).getValue();
+ if (DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue() !=
argumentStrategy) {
+ for (final PropertyDescriptor propertyDescriptor :
validationContext.getProperties().keySet()) {
+ if (!propertyDescriptor.isDynamic()) {
+ continue;
+ }
+
+ final String propertyName = propertyDescriptor.getName();
+ final Matcher matcher =
COMMAND_ARGUMENT_PATTERN.matcher(propertyName);
+ if (matcher.matches()) {
+ logger.warn("[{}] should be set to [{}] when command
arguments are supplied as Dynamic Properties. The property [{}] will be
ignored.",
+ ARGUMENTS_STRATEGY.getDisplayName(),
DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getDisplayName(), propertyName);
+ }
+ }
+ }
+
+ return validationResults;
}
@Override
@@ -393,21 +407,20 @@ public class ExecuteStreamCommand extends
AbstractProcessor {
.splitArgs(commandArguments,
context.getProperty(ARG_DELIMITER).getValue().charAt(0)));
}
} else {
-
List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
- Matcher matcher =
DYNAMIC_PARAMETER_NAME.matcher(entry.getKey().getName());
+ Matcher matcher =
COMMAND_ARGUMENT_PATTERN.matcher(entry.getKey().getName());
if (matcher.matches()) {
propertyDescriptors.add(entry.getKey());
}
}
propertyDescriptors.sort((p1, p2) -> {
- Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(p1.getName());
+ Matcher matcher =
COMMAND_ARGUMENT_PATTERN.matcher(p1.getName());
String indexString1 = null;
while (matcher.find()) {
indexString1 = matcher.group("commandIndex");
}
- matcher = DYNAMIC_PARAMETER_NAME.matcher(p2.getName());
+ matcher = COMMAND_ARGUMENT_PATTERN.matcher(p2.getName());
String indexString2 = null;
while (matcher.find()) {
indexString2 = matcher.group("commandIndex");
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteStreamCommand/additionalDetails.html
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteStreamCommand/additionalDetails.html
new file mode 100644
index 0000000000..23fd22f0da
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteStreamCommand/additionalDetails.html
@@ -0,0 +1,166 @@
+<!DOCTYPE html>
+<html lang="en">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <head>
+ <meta charset="utf-8" />
+ <title>ExecuteStreamCommand</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css" />
+ </head>
+
+ <body>
+ <h2>Description</h2>
+ <p>The ExecuteStreamCommand processor provides a flexible way to
integrate external commands and scripts into NiFi data flows.
+ ExecuteStreamCommand can pass the incoming FlowFile's content to the
command that it executes similarly how piping works.</p>
+
+ <h2>Configuration options</h2>
+
+ <h3>Working Directory</h3>
+ <p>If not specified, NiFi root will be the default working
directory.</p>
+
+ <h3>Configuring command arguments</h3>
+ <p>The ExecuteStreamCommand processor provides two ways to specify
command arguments: using Dynamic Properties and the Command Arguments field.</p>
+
+ <h4>Command Arguments field</h4>
+ <p>This is the default. If there are multiple arguments, they need to
be separated by a character specified in the Argument Delimiter field.
+ When needed, '-' and '--' can be provided, but in these cases Argument
Delimiter should be a different character.</p>
+
+ <p>Consider that we want to list all files in a directory which is
different from the working directory:</p>
+
+ <table>
+ <tr>
+ <th>Command Path</th>
+ <th>Command Arguments</th>
+ </tr>
+ <tr>
+ <td>ls</td>
+ <td>-lah;/path/to/dir</td>
+ </tr>
+ </table>
+
+ <p><b>NOTE:</b> the command should be on <code>$PATH</code> or it
should be in the working directory, otherwise path also should be specified.</p>
+
+ <h4>Dynamic Properties</h4>
+ <p>Arguments can be specified with Dynamic Properties. Dynamic
Properties with the pattern of 'command.arguments.<commandIndex>' will be
appended
+ to the command in ascending order.</p>
+
+ <p>The above example with dynamic properties would look like this:</p>
+
+ <table>
+ <tr>
+ <th>Property Name</th>
+ <th>Property Value</th>
+ </tr>
+ <tr>
+ <td>command.arguments.0</td>
+ <td>-lah</td>
+ </tr>
+ <tr>
+ <td>command.arguments.1</td>
+ <td>/path/to/dir</td>
+ </tr>
+ </table>
+
+ <h3>Configuring environment variables</h3>
+ <p>In addition to specifying command arguments using the Command
Argument field or Dynamic Properties, users can also use environment variables
with
+ the ExecuteStreamCommand processor. Environment variables are a set of
key-value pairs that can be accessed by processes running on the system.
+ ExecuteStreamCommand will treat every Dynamic Property as an
environment variable that doesn't match the pattern
'command.arguments.<commandIndex>'.</p>
+
+ <p>Consider that we want to execute a Maven command with the
processor. If there are multiple Java versions installed on the system, you can
specify
+ which version will be used by setting the <code>JAVA_HOME</code>
environment variable. The output FlowFile will looke like this if we run
+ <code>mvn</code> command with <code>--version</code> argument:</p>
+
+ <code>
+ <pre style="overflow:scroll">
+Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
+Maven home: /path/to/maven/home
+Java version: 11.0.18, vendor: Eclipse Adoptium, runtime:
/path/to/default/java/home
+Default locale: en_US, platform encoding: UTF-8
+OS name: "mac os x", version: "13.1", arch: "x86_64", family: "mac"
+ </pre>
+ </code>
+
+ <table>
+ <tr>
+ <th>Property Name</th>
+ <th>Property Value</th>
+ </tr>
+ <tr>
+ <td>JAVA_HOME</td>
+ <td>path/to/another/java/home</td>
+ </tr>
+ </table>
+
+ <p>After specifying the <code>JAVA_HOME</code> property, you can
notice that maven is using a different runtime:</p>
+ <code>
+ <pre>
+Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
+Maven home: /path/to/maven/home
+Java version: 11.0.18, vendor: Eclipse Adoptium, runtime:
/path/to/another/java/home
+Default locale: en_US, platform encoding: UTF-8
+OS name: "mac os x", version: "13.1", arch: "x86_64", family: "mac"
+ </pre>
+ </code>
+
+ <h3>Streaming input to the command</h3>
+ <p>ExecuteStreamCommand passes the incoming FlowFile's content to the
command that it executes similarly how piping works.
+ It is possible to disable this behavior with the Ignore STDIN
property. In the above examples we didn't use the
+ incoming FlowFile's content, so in this case we could leverage this
property for additional performance gain.</p>
+
+ <p>To utilize the streaming capability, consider that we want to use
<code>grep</code> on the FlowFile. Let's presume that
+ we need to list all <code>POST</code> requests from an Apache HTTPD
log:</p>
+ <code>
+ <pre style="overflow:scroll">
+127.0.0.1 - - [03/May/2023:13:54:26 +0000] "GET /example-page HTTP/1.1" 200
4825 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101
Firefox/88.0"
+127.0.0.1 - - [03/May/2023:14:05:32 +0000] "POST /submit-form HTTP/1.1" 302 0
"http://localhost/example-page" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64;
rv:88.0) Gecko/20100101 Firefox/88.0"
+127.0.0.1 - - [03/May/2023:14:10:48 +0000] "GET /image.jpg HTTP/1.1" 200 35785
"http://localhost/example-page" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64;
rv:88.0) Gecko/20100101 Firefox/88.0"
+127.0.0.1 - - [03/May/2023:14:20:15 +0000] "GET /example-page HTTP/1.1" 200
4825 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101
Firefox/88.0"
+127.0.0.1 - - [03/May/2023:14:30:42 +0000] "GET /example-page HTTP/1.1" 200
4825 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101
Firefox/88.0"
+ </pre>
+ </code>
+
+ <p>Processor configuration:</p>
+ <table>
+ <tr>
+ <th>Working Directory</th>
+ <th>Command Path</th>
+ <th>Command Arguments Strategy</th>
+ <th>Command Arguments</th>
+ <th>Argument Delimiter</th>
+ <th>Ignore STDIN</th>
+ <th>Output Destination Attribute</th>
+ <th>Max Attribute Length</th>
+ </tr>
+ <tr>
+ <td></td>
+ <td>grep</td>
+ <td>Command Arguments Property</td>
+ <td>POST</td>
+ <td>;</td>
+ <td>false</td>
+ <td></td>
+ <td>256</td>
+ </tr>
+ </table>
+
+ <p>With this the emitted FlowFile on the "output stream" relationship
should be:</p>
+ <code>
+ <pre style="overflow:scroll">
+127.0.0.1 - - [03/May/2023:14:05:32 +0000] "POST /submit-form HTTP/1.1" 302 0
"http://localhost/example-page" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64;
rv:88.0) Gecko/20100101 Firefox/88.0"
+ </pre>
+ </code>
+
+ </body>
+</html>