This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new ee813d9650 NIFI-12011 Added MIME Type to ExecuteStreamCommand and
ExecuteProcess
ee813d9650 is described below
commit ee813d965096f18a0ec234a6c53a2c29919646f1
Author: Matt Burgess <[email protected]>
AuthorDate: Tue Aug 29 18:41:03 2023 -0400
NIFI-12011 Added MIME Type to ExecuteStreamCommand and ExecuteProcess
This closes #7660
Signed-off-by: David Handermann <[email protected]>
(cherry picked from commit 24736f62767a88669a6845a6e3f21ce801b8b645)
---
.../nifi/processors/standard/ExecuteProcess.java | 27 ++++++++++++++++++----
.../processors/standard/ExecuteStreamCommand.java | 16 ++++++++++++-
.../nifi/processors/standard/GenerateFlowFile.java | 5 ++++
.../processors/standard/TestExecuteProcess.java | 11 +++++----
.../standard/TestExecuteStreamCommand.java | 5 ++++
5 files changed, 53 insertions(+), 11 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
index 244d5aaa23..6ce06df09e 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
@@ -33,6 +33,7 @@ import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -83,7 +84,8 @@ import java.util.concurrent.locks.ReentrantLock;
)
@WritesAttributes({
@WritesAttribute(attribute = "command", description = "Executed command"),
- @WritesAttribute(attribute = "command.arguments", description = "Arguments
of the command")
+ @WritesAttribute(attribute = "command.arguments", description = "Arguments
of the command"),
+ @WritesAttribute(attribute = "mime.type", description = "Sets the MIME
type of the output if the 'Output MIME Type' property is set and 'Batch
Duration' is not set")
})
public class ExecuteProcess extends AbstractProcessor {
@@ -146,6 +148,13 @@ public class ExecuteProcess extends AbstractProcessor {
.defaultValue(" ")
.build();
+ static final PropertyDescriptor MIME_TYPE = new
PropertyDescriptor.Builder()
+ .name("Output MIME type")
+ .displayName("Output MIME Type")
+ .description("Specifies the value to set for the \"mime.type\"
attribute. This property is ignored if 'Batch Duration' is set.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -173,6 +182,7 @@ public class ExecuteProcess extends AbstractProcessor {
properties.add(REDIRECT_ERROR_STREAM);
properties.add(WORKING_DIR);
properties.add(ARG_DELIMITER);
+ properties.add(MIME_TYPE);
return properties;
}
@@ -265,6 +275,7 @@ public class ExecuteProcess extends AbstractProcessor {
try {
longRunningProcess.get();
} catch (final InterruptedException ie) {
+ // Ignore
} catch (final ExecutionException ee) {
getLogger().error("Process execution failed due to
{}", new Object[] { ee.getCause() });
}
@@ -273,6 +284,7 @@ public class ExecuteProcess extends AbstractProcessor {
try {
TimeUnit.NANOSECONDS.sleep(batchNanos);
} catch (final InterruptedException ie) {
+ // Ignore
}
}
@@ -290,15 +302,20 @@ public class ExecuteProcess extends AbstractProcessor {
session.remove(flowFile);
getLogger().error("Failed to read data from Process, so will not
generate FlowFile");
} else {
- // add command and arguments as attribute
- flowFile = session.putAttribute(flowFile, ATTRIBUTE_COMMAND,
command);
+ // add command, arguments, and MIME type as attributes
+ Map<String,String> attributes = new HashMap<>();
+ attributes.put(ATTRIBUTE_COMMAND, command);
if(arguments != null) {
- flowFile = session.putAttribute(flowFile,
ATTRIBUTE_COMMAND_ARGS, arguments);
+ attributes.put(ATTRIBUTE_COMMAND_ARGS, arguments);
+ }
+ if (batchNanos == null &&
context.getProperty(ExecuteProcess.MIME_TYPE).isSet()) {
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
context.getProperty(ExecuteProcess.MIME_TYPE).getValue());
}
+ flowFile = session.putAllAttributes(flowFile, attributes);
// All was good. Generate event and transfer FlowFile.
session.getProvenanceReporter().create(flowFile, "Created from
command: " + commandString);
- getLogger().info("Created {} and routed to success", new Object[]
{ flowFile });
+ getLogger().info("Created {} and routed to success", flowFile);
session.transfer(flowFile, REL_SUCCESS);
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index 934593e68d..26a0d86922 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@ -42,6 +42,7 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -161,7 +162,8 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "execution.command", description = "The
name of the command executed"),
@WritesAttribute(attribute = "execution.command.args", description =
"The semi-colon delimited list of arguments. Sensitive properties will be
masked"),
@WritesAttribute(attribute = "execution.status", description = "The
exit status code returned from executing the command"),
- @WritesAttribute(attribute = "execution.error", description = "Any
error messages returned from executing the command")})
+ @WritesAttribute(attribute = "execution.error", description = "Any
error messages returned from executing the command"),
+ @WritesAttribute(attribute = "mime.type", description = "Sets the MIME
type of the output if the 'Output MIME Type' property is set and 'Output
Destination Attribute' is not set")})
@Restricted(
restrictions = {
@Restriction(
@@ -276,6 +278,14 @@ public class ExecuteStreamCommand extends
AbstractProcessor {
.defaultValue("256")
.build();
+ static final PropertyDescriptor MIME_TYPE = new
PropertyDescriptor.Builder()
+ .name("Output MIME Type")
+ .displayName("Output MIME Type")
+ .description("Specifies the value to set for the \"mime.type\"
attribute. This property is ignored if 'Output Destination Attribute' is set.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
private static final List<PropertyDescriptor> PROPERTIES;
private static final String MASKED_ARGUMENT = "********";
@@ -289,6 +299,7 @@ public class ExecuteStreamCommand extends AbstractProcessor
{
props.add(IGNORE_STDIN);
props.add(PUT_OUTPUT_IN_ATTRIBUTE);
props.add(PUT_ATTRIBUTE_MAX_LENGTH);
+ props.add(MIME_TYPE);
PROPERTIES = Collections.unmodifiableList(props);
Set<Relationship> outputStreamRelationships = new HashSet<>();
@@ -518,6 +529,9 @@ public class ExecuteStreamCommand extends AbstractProcessor
{
attributes.put("execution.status", Integer.toString(exitCode));
attributes.put("execution.command", executeCommand);
attributes.put("execution.command.args", commandArguments);
+ if (context.getProperty(MIME_TYPE).isSet() && !putToAttribute) {
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
context.getProperty(MIME_TYPE).getValue());
+ }
outputFlowFile = session.putAllAttributes(outputFlowFile,
attributes);
if
(NONZERO_STATUS_RELATIONSHIP.equals(outputFlowFileRelationship)) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
index 5c9b7cffd3..d8846c767d 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
@@ -34,6 +34,8 @@ import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -64,6 +66,9 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
description = "Specifies an attribute on generated FlowFiles defined
by the Dynamic Property's key and value." +
" If Expression Language is used, evaluation will be performed only
once per batch of generated FlowFiles.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "mime.type", description = "Sets the MIME
type of the output if the 'Mime Type' property is set"),
+})
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class GenerateFlowFile extends AbstractProcessor {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
index fea47deaad..fbd7fea51d 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
@@ -21,6 +21,7 @@ import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.util.LogMessage;
@@ -90,6 +91,7 @@ public class TestExecuteProcess {
runner.setProperty(ExecuteProcess.COMMAND, "echo");
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "test-args");
runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis");
+ runner.setProperty(ExecuteProcess.MIME_TYPE, "application/json");
runner.run();
@@ -97,6 +99,7 @@ public class TestExecuteProcess {
for (final MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile);
System.out.println(new String(flowFile.toByteArray()));
+ flowFile.assertAttributeNotExists(CoreAttributes.MIME_TYPE.key());
}
}
@@ -124,7 +127,7 @@ public class TestExecuteProcess {
final List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
if(!flowFiles.isEmpty()) {
-
assertTrue(flowFiles.get(0).getAttribute("command").equals("ping"));
+ assertEquals("ping", flowFiles.get(0).getAttribute("command"));
}
}
@@ -151,7 +154,6 @@ public class TestExecuteProcess {
for (final MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile);
totalFlowFilesSize += flowFile.getSize();
- // System.out.println(new String(flowFile.toByteArray()));
}
assertEquals(inFile.length(), totalFlowFilesSize);
@@ -198,12 +200,10 @@ public class TestExecuteProcess {
final List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
long totalFlowFilesSize = 0;
for (final MockFlowFile flowFile : flowFiles) {
- System.out.println(flowFile);
totalFlowFilesSize += flowFile.getSize();
- // System.out.println(new String(flowFile.toByteArray()));
}
- // assertEquals(inFile.length(), totalFlowFilesSize);
+ assertEquals(inFile.length(), totalFlowFilesSize);
}
@Test
@@ -233,6 +233,7 @@ public class TestExecuteProcess {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
+ // Ignore
}
}
final List<LogMessage> warnMessages =
runner.getLogger().getWarnMessages();
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
index 9f11c6022d..d4f7fac4ab 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
@@ -85,6 +86,7 @@ public class TestExecuteStreamCommand {
String jarPath = exJar.getAbsolutePath();
exJar.setExecutable(true);
final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setProperty(ExecuteStreamCommand.MIME_TYPE.getName(),
"text/plain");
controller.enqueue(dummy.toPath());
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS,
"-jar;" + jarPath);
@@ -103,6 +105,7 @@ public class TestExecuteStreamCommand {
String attribute =
outputFlowFile.getAttribute("execution.command.args");
String expected = "src" + File.separator + "test" + File.separator +
"resources" + File.separator + "ExecuteCommand" + File.separator +
"TestSuccess.jar";
assertEquals(expected, attribute.substring(attribute.length() -
expected.length()));
+ outputFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"text/plain");
MockFlowFile originalFlowFile =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP).get(0);
assertEquals(outputFlowFile.getAttribute("execution.status"),
originalFlowFile.getAttribute("execution.status"));
@@ -506,6 +509,7 @@ public class TestExecuteStreamCommand {
File dummy = new File("src/test/resources/hello.txt");
assertTrue(dummy.exists());
final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setProperty(ExecuteStreamCommand.MIME_TYPE,
"application/json");
controller.enqueue("".getBytes());
if(isWindows()) {
@@ -526,6 +530,7 @@ public class TestExecuteStreamCommand {
List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
MockFlowFile outputFlowFile = flowFiles.get(0);
outputFlowFile.assertContentEquals("");
+
outputFlowFile.assertAttributeNotExists(CoreAttributes.MIME_TYPE.key());
String ouput =
outputFlowFile.getAttribute("executeStreamCommand.output");
assertTrue(ouput.startsWith("Hello"));
assertEquals("0", outputFlowFile.getAttribute("execution.status"));