This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 991e5e24de NIFI-4957 Add Resource File Support for Jolt Specifications
991e5e24de is described below
commit 991e5e24dece9f1015ef56b15a152fbdeeaa78d8
Author: Matthew Burgess <[email protected]>
AuthorDate: Mon Feb 10 13:02:49 2020 -0500
NIFI-4957 Add Resource File Support for Jolt Specifications
This closes #4044
Signed-off-by: David Handermann <[email protected]>
---
.../jolt/record/JoltTransformRecord.java | 122 +++++++++++-------
.../jolt/record/TestJoltTransformRecord.java | 73 ++++++++---
.../processors/standard/JoltTransformJSON.java | 137 +++++++++++++--------
.../processors/standard/TestJoltTransformJSON.java | 60 ++++++++-
4 files changed, 277 insertions(+), 115 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
index 9293e4c23f..0467cf1b2c 100644
---
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
+++
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
@@ -34,9 +34,11 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceReference;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -60,10 +62,15 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+import java.io.BufferedReader;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.OutputStream;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -86,7 +93,7 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "record.count", description = "The number
of records in an outgoing FlowFile"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type
that the configured Record Writer indicates is appropriate"),
})
-@CapabilityDescription("Applies a list of Jolt specifications to the FlowFile
payload. A new FlowFile is created "
+@CapabilityDescription("Applies a JOLT specification to each record in the
FlowFile payload. A new FlowFile is created "
+ "with transformed content and is routed to the 'success'
relationship. If the transform "
+ "fails, the original FlowFile is routed to the 'failure'
relationship.")
@RequiresInstanceClassLoading
@@ -141,9 +148,11 @@ public class JoltTransformRecord extends AbstractProcessor
{
static final PropertyDescriptor JOLT_SPEC = new
PropertyDescriptor.Builder()
.name("jolt-record-spec")
.displayName("Jolt Specification")
- .description("Jolt Specification for transform of record data.
This value is ignored if the Jolt Sort Transformation is selected.")
+ .description("Jolt Specification for transform of record data. The
value for this property may be the text of a JOLT specification "
+ + "or the path to a file containing a JOLT specification.
This value is ignored if the Jolt Sort Transformation is selected.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .identifiesExternalResource(ResourceCardinality.SINGLE,
ResourceType.FILE, ResourceType.TEXT)
.required(false)
.build();
@@ -238,19 +247,26 @@ public class JoltTransformRecord extends
AbstractProcessor {
final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
final String transform =
validationContext.getProperty(JOLT_TRANSFORM).getValue();
final String customTransform =
validationContext.getProperty(CUSTOM_CLASS).getValue();
- if (!validationContext.getProperty(JOLT_SPEC).isSet() ||
StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) {
- if (!SORTR.getValue().equals(transform)) {
- final String message = "A specification is required for this
transformation";
- results.add(new ValidationResult.Builder().valid(false)
- .explanation(message)
- .build());
- }
+ final String modulePath =
validationContext.getProperty(MODULES).isSet()?
validationContext.getProperty(MODULES).getValue() : null;
+ final String joltSpecValue =
validationContext.getProperty(JOLT_SPEC).getValue();
+
+ if (StringUtils.isEmpty(joltSpecValue) &&
!SORTR.getValue().equals(transform)) {
+ results.add(new
ValidationResult.Builder().subject(JOLT_SPEC.getDisplayName()).valid(false).explanation(
+ "'Jolt Specification' must be set, or the
Transformation must be 'Sort'").build());
} else {
+ final ClassLoader customClassLoader;
+
try {
- final String specValue =
validationContext.getProperty(JOLT_SPEC).getValue();
+ if (modulePath != null) {
+ customClassLoader =
ClassLoaderUtils.getCustomClassLoader(modulePath,
this.getClass().getClassLoader(), getJarFilenameFilter());
+ } else {
+ customClassLoader = this.getClass().getClassLoader();
+ }
+
+ final boolean elPresent =
validationContext.isExpressionLanguagePresent(joltSpecValue);
- if (validationContext.isExpressionLanguagePresent(specValue) )
{
- final String invalidExpressionMsg =
validationContext.newExpressionLanguageCompiler().validateExpression(specValue,
true);
+ if (elPresent) {
+ final String invalidExpressionMsg =
validationContext.newExpressionLanguageCompiler().validateExpression(joltSpecValue,
true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
results.add(new ValidationResult.Builder().valid(false)
.subject(JOLT_SPEC.getDisplayName())
@@ -258,28 +274,24 @@ public class JoltTransformRecord extends
AbstractProcessor {
.build());
}
} else {
- //for validation we want to be able to ensure the spec is
syntactically correct and not try to resolve variables since they may not exist
yet
- Object specJson = SORTR.getValue().equals(transform) ?
null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"),
DEFAULT_CHARSET);
-
- if (CUSTOMR.getValue().equals(transform)) {
- if (StringUtils.isEmpty(customTransform)) {
- final String customMessage = "A custom
transformation class should be provided. ";
- results.add(new
ValidationResult.Builder().valid(false)
- .explanation(customMessage)
- .build());
- } else if
(validationContext.isExpressionLanguagePresent(customTransform)) {
- final String invalidExpressionMsg =
validationContext.newExpressionLanguageCompiler().validateExpression(customTransform,
true);
- if (!StringUtils.isEmpty(invalidExpressionMsg)) {
+ if (!SORTR.getValue().equals(transform)) {
+
+ //for validation we want to be able to ensure the spec
is syntactically correct and not try to resolve variables since they may not
exist yet
+ final String content =
readTransform(validationContext.getProperty(JOLT_SPEC));
+ final Object specJson =
JsonUtils.jsonToObject(content.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"),
DEFAULT_CHARSET);
+
+ if (CUSTOMR.getValue().equals(transform)) {
+ if (StringUtils.isEmpty(customTransform)) {
+ final String customMessage = "A custom
transformation class should be provided. ";
results.add(new
ValidationResult.Builder().valid(false)
- .subject(CUSTOM_CLASS.getDisplayName())
- .explanation("Invalid Expression
Language: " + invalidExpressionMsg)
+ .explanation(customMessage)
.build());
+ } else {
+
TransformFactory.getCustomTransform(customClassLoader, customTransform,
specJson);
}
} else {
-
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(),
customTransform, specJson);
+ TransformFactory.getTransform(customClassLoader,
transform, specJson);
}
- } else {
-
TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(),
transform, specJson);
}
}
} catch (final Exception e) {
@@ -294,7 +306,6 @@ public class JoltTransformRecord extends AbstractProcessor {
return results;
}
- @SuppressWarnings("unchecked")
@Override
public void onTrigger(final ProcessContext context, ProcessSession
session) throws ProcessException {
final FlowFile original = session.get();
@@ -337,7 +348,7 @@ public class JoltTransformRecord extends AbstractProcessor {
}
transformed = session.putAllAttributes(transformed,
attributes);
- logger.info("{} had no Records to transform", new
Object[]{original});
+ logger.info("{} had no Records to transform", original);
} else {
final JoltTransform transform = getTransform(context,
original);
@@ -375,9 +386,6 @@ public class JoltTransformRecord extends AbstractProcessor {
while ((record = reader.nextRecord()) != null) {
final List<Record> transformedRecords =
transform(record, transform);
- if (transformedRecords == null) {
- throw new ProcessException("Error transforming the
record");
- }
for (Record transformedRecord : transformedRecords) {
writer.write(transformedRecord);
}
@@ -388,7 +396,7 @@ public class JoltTransformRecord extends AbstractProcessor {
try {
writer.close();
} catch (final IOException ioe) {
- getLogger().warn("Failed to close Writer for {}", new
Object[]{transformed});
+ getLogger().warn("Failed to close Writer for {}",
transformed);
}
attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
@@ -399,10 +407,10 @@ public class JoltTransformRecord extends
AbstractProcessor {
final String transformType =
context.getProperty(JOLT_TRANSFORM).getValue();
transformed = session.putAllAttributes(transformed,
attributes);
session.getProvenanceReporter().modifyContent(transformed,
"Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- logger.debug("Transformed {}", new Object[]{original});
+ logger.debug("Transform completed {}", original);
}
- } catch (final Exception ex) {
- logger.error("Unable to transform {} due to {}", new
Object[]{original, ex.toString(), ex});
+ } catch (final Exception e) {
+ logger.error("Transform failed for {}", original, e);
session.transfer(original, REL_FAILURE);
if (transformed != null) {
session.remove(transformed);
@@ -449,13 +457,15 @@ public class JoltTransformRecord extends
AbstractProcessor {
final Optional<String> specString;
if (context.getProperty(JOLT_SPEC).isSet()) {
specString =
Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue());
- } else {
+ } else if
(SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specString = Optional.empty();
+ } else {
+ throw new IllegalArgumentException("'Jolt Specification' must be
set, or the Transformation must be Sort.");
}
return transformCache.get(specString, currString -> {
try {
- return createTransform(context, currString.orElse(null),
flowFile);
+ return createTransform(context, flowFile);
} catch (Exception e) {
getLogger().error("Problem getting transform", e);
}
@@ -463,6 +473,27 @@ public class JoltTransformRecord extends AbstractProcessor
{
});
}
+ private String readTransform(final PropertyValue propertyValue, final
FlowFile flowFile) {
+ final String transform;
+
+ if (propertyValue.isExpressionLanguagePresent()) {
+ transform =
propertyValue.evaluateAttributeExpressions(flowFile).getValue();
+ } else {
+ transform = readTransform(propertyValue);
+ }
+
+ return transform;
+ }
+
+ private String readTransform(final PropertyValue propertyValue) {
+ final ResourceReference resourceReference = propertyValue.asResource();
+ try (final BufferedReader reader = new BufferedReader(new
InputStreamReader(resourceReference.read()))) {
+ return reader.lines().collect(Collectors.joining());
+ } catch (final IOException e) {
+ throw new UncheckedIOException("Read JOLT Transform failed", e);
+ }
+ }
+
@OnScheduled
public void setup(final ProcessContext context) {
int maxTransformsToCache =
context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
@@ -471,10 +502,11 @@ public class JoltTransformRecord extends
AbstractProcessor {
.build();
}
- private JoltTransform createTransform(final ProcessContext context, final
String specString, final FlowFile flowFile) throws Exception {
+ private JoltTransform createTransform(final ProcessContext context, final
FlowFile flowFile) throws Exception {
final Object specJson;
- if (context.getProperty(JOLT_SPEC).isSet() &&
!SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
- specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
+ if ((context.getProperty(JOLT_SPEC).isSet() &&
!SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue()))) {
+ final String resolvedSpec =
readTransform(context.getProperty(JOLT_SPEC), flowFile);
+ specJson = JsonUtils.jsonToObject(resolvedSpec, DEFAULT_CHARSET);
} else {
specJson = null;
}
@@ -491,6 +523,10 @@ public class JoltTransformRecord extends AbstractProcessor
{
? ((ContextualTransform) joltTransform).transform(input,
Collections.emptyMap()) : ((Transform) joltTransform).transform(input);
}
+ protected FilenameFilter getJarFilenameFilter(){
+ return (dir, name) -> (name != null && name.endsWith(".jar"));
+ }
+
/**
* Recursively replace List objects with Object[]. JOLT expects arrays to
be of type List where our Record code uses Object[].
*
diff --git
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
index bdbcbf61af..dd6b7af8c1 100644
---
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
+++
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
@@ -102,6 +102,24 @@ public class TestJoltTransformRecord {
assertEquals(3, relationships.size());
}
+ @Test
+ public void testRelationshipsCreatedFromFile() throws IOException {
+ generateTestData(1, null);
+ final String outputSchemaText = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT,
outputSchemaText);
+ runner.setProperty(writer, "Pretty Print JSON", "true");
+ runner.enableControllerService(writer);
+ final String spec =
"./src/test/resources/TestJoltTransformRecord/chainrSpec.json";
+ runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+ runner.enqueue(new byte[0]);
+ Set<Relationship> relationships = processor.getRelationships();
+ assertTrue(relationships.contains(JoltTransformRecord.REL_FAILURE));
+ assertTrue(relationships.contains(JoltTransformRecord.REL_SUCCESS));
+ assertTrue(relationships.contains(JoltTransformRecord.REL_ORIGINAL));
+ assertEquals(3, relationships.size());
+ }
+
@Test
public void testInvalidJOLTSpec() throws IOException {
generateTestData(1, null);
@@ -110,9 +128,14 @@ public class TestJoltTransformRecord {
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT,
outputSchemaText);
runner.setProperty(writer, "Pretty Print JSON", "true");
runner.enableControllerService(writer);
- final String spec = "[{}]";
+ String spec = "[{}]";
runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
runner.assertNotValid();
+
+ final String specLocation =
"src/test/resources/TestJoltTransformRecord/chainrSpec.json";
+ spec = new String(Files.readAllBytes(Paths.get(specLocation)));
+ runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+ runner.assertValid();
}
@Test
@@ -277,7 +300,7 @@ public class TestJoltTransformRecord {
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"application/json");
@@ -299,7 +322,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"application/json");
@@ -332,7 +355,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
put("b", 2);
put("c", 3);
}});
- final Object[] recordArray1 = new Object[] {record1, record2, record3};
+ final Object[] recordArray1 = new Object[]{record1, record2, record3};
parser.addRecord((Object) recordArray1);
final Record record4 = new MapRecord(xSchema, new HashMap<String,
Object>() {{
@@ -345,7 +368,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
put("b", 201);
put("c", 301);
}});
- final Object[] recordArray2 = new Object[] {record4, record5};
+ final Object[] recordArray2 = new Object[]{record4, record5};
parser.addRecord((Object) recordArray2);
final String outputSchemaText = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc")));
@@ -365,7 +388,28 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"application/json");
assertEquals(new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputMultipleOutputRecords.json"))),
new String(transformed.toByteArray()));
+ }
+ @Test
+ public void testTransformInputWithShiftrFromFile() throws IOException {
+ generateTestData(1, null);
+ final String outputSchemaText = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")));
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT,
outputSchemaText);
+ runner.setProperty(writer, "Pretty Print JSON", "true");
+ runner.enableControllerService(writer);
+ final String spec =
"./src/test/resources/TestJoltTransformRecord/shiftrSpec.json";
+ runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+ runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM,
JoltTransformRecord.SHIFTR);
+ runner.enqueue(new byte[0]);
+ runner.run();
+ runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+ transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+ transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"application/json");
+ assertEquals(new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json"))),
+ new String(transformed.toByteArray()));
}
@Test
@@ -382,7 +426,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json"))),
new String(transformed.toByteArray()));
@@ -402,7 +446,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutput.json"))),
new String(transformed.toByteArray()));
@@ -423,7 +467,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutput.json"))),
new String(transformed.toByteArray()));
@@ -442,7 +486,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"application/json");
@@ -465,7 +509,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutput.json"))),
new String(transformed.toByteArray()));
@@ -506,7 +550,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json"))),
new String(transformed.toByteArray()));
@@ -526,7 +570,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json"))),
new String(transformed.toByteArray()));
@@ -545,7 +589,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"application/json");
@@ -570,7 +614,7 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"application/json");
@@ -684,5 +728,4 @@
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
recordGenerator.apply(numRecords, parser);
}
}
-
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
index 040a9bd36e..7542cd4564 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
@@ -31,9 +31,11 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceReference;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -44,7 +46,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.jolt.TransformFactory;
import org.apache.nifi.processors.standard.util.jolt.TransformUtils;
@@ -52,10 +53,12 @@ import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+import java.io.BufferedReader;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -64,13 +67,14 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
@EventDriven
@SideEffectFree
@SupportsBatching
-@Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr",
"removr","cardinality","sort"})
+@Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr",
"cardinality", "sort"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@WritesAttribute(attribute = "mime.type",description = "Always set to
application/json")
+@WritesAttribute(attribute = "mime.type", description = "Always set to
application/json")
@CapabilityDescription("Applies a list of Jolt specifications to the flowfile
JSON payload. A new FlowFile is created "
+ "with transformed content and is routed to the 'success'
relationship. If the JSON transform "
+ "fails, the original FlowFile is routed to the 'failure'
relationship.")
@@ -100,9 +104,12 @@ public class JoltTransformJSON extends AbstractProcessor {
public static final PropertyDescriptor JOLT_SPEC = new
PropertyDescriptor.Builder()
.name("jolt-spec")
.displayName("Jolt Specification")
- .description("Jolt Specification for transform of JSON data. This
value is ignored if the Jolt Sort Transformation is selected.")
+ .description("Jolt Specification for transformation of JSON data.
The value for this property may be the text of a Jolt specification "
+ + "or the path to a file containing a Jolt specification.
'Jolt Specification' must be set, or "
+ + "the value is ignored if the Jolt Sort Transformation is
selected.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .identifiesExternalResource(ResourceCardinality.SINGLE,
ResourceType.FILE, ResourceType.TEXT)
.required(false)
.build();
@@ -130,7 +137,7 @@ public class JoltTransformJSON extends AbstractProcessor {
static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new
PropertyDescriptor.Builder()
.name("Transform Cache Size")
.description("Compiling a Jolt Transform can be fairly expensive.
Ideally, this will be done only once. However, if the Expression Language is
used in the transform, we may need "
- + "a new Transform for each FlowFile. This value controls how
many of those Transforms we cache in memory in order to avoid having to compile
the Transform each time.")
+ + "a new Transform for each FlowFile. This value controls
how many of those Transforms we cache in memory in order to avoid having to
compile the Transform each time.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
@@ -198,15 +205,12 @@ public class JoltTransformJSON extends AbstractProcessor {
final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
final String transform =
validationContext.getProperty(JOLT_TRANSFORM).getValue();
final String customTransform =
validationContext.getProperty(CUSTOM_CLASS).getValue();
- final String modulePath =
validationContext.getProperty(MODULES).isSet()?
validationContext.getProperty(MODULES).getValue() : null;
+ final String modulePath =
validationContext.getProperty(MODULES).isSet() ?
validationContext.getProperty(MODULES).getValue() : null;
+ final String joltSpecBody =
validationContext.getProperty(JOLT_SPEC).getValue();
- if(!validationContext.getProperty(JOLT_SPEC).isSet() ||
StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())){
- if(!SORTR.getValue().equals(transform)) {
- final String message = "A specification is required for this
transformation";
- results.add(new ValidationResult.Builder().valid(false)
- .explanation(message)
- .build());
- }
+ if (StringUtils.isEmpty(joltSpecBody) &&
!SORTR.getValue().equals(transform)) {
+ results.add(new
ValidationResult.Builder().subject(JOLT_SPEC.getDisplayName()).valid(false).explanation(
+ "'Jolt Specification' must be set, or the Transformation
must be 'Sort'").build());
} else {
final ClassLoader customClassLoader;
@@ -214,12 +218,14 @@ public class JoltTransformJSON extends AbstractProcessor {
if (modulePath != null &&
!validationContext.isExpressionLanguagePresent(modulePath)) {
customClassLoader =
ClassLoaderUtils.getCustomClassLoader(modulePath,
this.getClass().getClassLoader(), getJarFilenameFilter());
} else {
- customClassLoader = this.getClass().getClassLoader();
+ customClassLoader = this.getClass().getClassLoader();
}
- final String specValue =
validationContext.getProperty(JOLT_SPEC).getValue();
+ String specValue =
validationContext.getProperty(JOLT_SPEC).getValue();
- if (validationContext.isExpressionLanguagePresent(specValue)) {
+ final boolean elPresent =
validationContext.isExpressionLanguagePresent(specValue);
+
+ if (elPresent) {
final String invalidExpressionMsg =
validationContext.newExpressionLanguageCompiler().validateExpression(specValue,
true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
results.add(new ValidationResult.Builder().valid(false)
@@ -236,26 +242,31 @@ public class JoltTransformJSON extends AbstractProcessor {
.build());
}
} else {
- //for validation we want to be able to ensure the spec is
syntactically correct and not try to resolve variables since they may not exist
yet
- Object specJson = SORTR.getValue().equals(transform) ?
null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{","\\\\\\\\\\$\\{"),
DEFAULT_CHARSET);
-
- if (CUSTOMR.getValue().equals(transform)) {
- if (StringUtils.isEmpty(customTransform)) {
- final String customMessage = "A custom
transformation class should be provided. ";
- results.add(new
ValidationResult.Builder().valid(false)
- .explanation(customMessage)
- .build());
+ if (!SORTR.getValue().equals(transform)) {
+
+ ///for validation we want to be able to ensure the
spec is syntactically correct and not try to resolve variables since they may
not exist yet
+ final String content =
readTransform(validationContext.getProperty(JOLT_SPEC));
+ final Object specJson =
JsonUtils.jsonToObject(content.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"),
DEFAULT_CHARSET);
+
+ if (CUSTOMR.getValue().equals(transform)) {
+ if (StringUtils.isEmpty(customTransform)) {
+ final String customMessage = "A custom
transformation class should be provided. ";
+ results.add(new
ValidationResult.Builder().valid(false)
+ .explanation(customMessage)
+ .build());
+ } else {
+
TransformFactory.getCustomTransform(customClassLoader, customTransform,
specJson);
+ }
} else {
-
TransformFactory.getCustomTransform(customClassLoader, customTransform,
specJson);
+ TransformFactory.getTransform(customClassLoader,
transform, specJson);
}
- } else {
- TransformFactory.getTransform(customClassLoader,
transform, specJson);
}
}
} catch (final Exception e) {
- getLogger().error("processor is not valid: ", e);
- String message = "Specification not valid for the selected
transformation." ;
- results.add(new ValidationResult.Builder().valid(false)
+ String message = String.format("Specification not valid for
the selected transformation: %s", e);
+ results.add(new ValidationResult.Builder()
+ .valid(false)
+ .subject(JOLT_SPEC.getDisplayName())
.explanation(message)
.build());
}
@@ -278,7 +289,7 @@ public class JoltTransformJSON extends AbstractProcessor {
try (final InputStream in = session.read(original)) {
inputJson = JsonUtils.jsonToObject(in);
} catch (final Exception e) {
- logger.error("Failed to transform {}; routing to failure", new
Object[] {original, e});
+ logger.error("JSON parsing failed for {}", original, e);
session.transfer(original, REL_FAILURE);
return;
}
@@ -291,10 +302,10 @@ public class JoltTransformJSON extends AbstractProcessor {
Thread.currentThread().setContextClassLoader(customClassLoader);
}
- final Object transformedJson =
TransformUtils.transform(transform,inputJson);
+ final Object transformedJson = TransformUtils.transform(transform,
inputJson);
jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ?
JsonUtils.toPrettyJsonString(transformedJson) :
JsonUtils.toJsonString(transformedJson);
- } catch (final Exception ex) {
- logger.error("Unable to transform {} due to {}", new Object[]
{original, ex.toString(), ex});
+ } catch (final Exception e) {
+ logger.error("Transform failed for {}", original, e);
session.transfer(original, REL_FAILURE);
return;
} finally {
@@ -303,33 +314,30 @@ public class JoltTransformJSON extends AbstractProcessor {
}
}
- FlowFile transformed = session.write(original, new
OutputStreamCallback() {
- @Override
- public void process(OutputStream out) throws IOException {
- out.write(jsonString.getBytes(DEFAULT_CHARSET));
- }
- });
+ FlowFile transformed = session.write(original, out ->
out.write(jsonString.getBytes(DEFAULT_CHARSET)));
final String transformType =
context.getProperty(JOLT_TRANSFORM).getValue();
transformed = session.putAttribute(transformed,
CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(transformed, REL_SUCCESS);
- session.getProvenanceReporter().modifyContent(transformed,"Modified
With " + transformType ,stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- logger.info("Transformed {}", new Object[]{original});
+ session.getProvenanceReporter().modifyContent(transformed, "Modified
With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ logger.info("Transform completed for {}", original);
}
private JoltTransform getTransform(final ProcessContext context, final
FlowFile flowFile) {
final Optional<String> specString;
if (context.getProperty(JOLT_SPEC).isSet()) {
specString =
Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue());
- } else {
+ } else if
(SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specString = Optional.empty();
+ } else {
+ throw new IllegalArgumentException("'Jolt Specification' must be
set, or the Transformation must be Sort.");
}
return transformCache.get(specString, currString -> {
try {
- return createTransform(context, currString.orElse(null),
flowFile);
+ return createTransform(context, flowFile);
} catch (Exception e) {
- getLogger().error("Problem getting transform", e);
+ getLogger().error("Transform creation failed", e);
}
return null;
});
@@ -352,15 +360,16 @@ public class JoltTransformJSON extends AbstractProcessor {
} else {
customClassLoader = this.getClass().getClassLoader();
}
- } catch (final Exception ex) {
- getLogger().error("Unable to setup processor", ex);
+ } catch (final Exception e) {
+ getLogger().error("ClassLoader configuration failed", e);
}
}
- private JoltTransform createTransform(final ProcessContext context, final
String specString, final FlowFile flowFile) throws Exception {
+ private JoltTransform createTransform(final ProcessContext context, final
FlowFile flowFile) throws Exception {
final Object specJson;
- if (context.getProperty(JOLT_SPEC).isSet() &&
!SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
- specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
+ if ((context.getProperty(JOLT_SPEC).isSet() &&
!SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue()))) {
+ final String resolvedSpec =
readTransform(context.getProperty(JOLT_SPEC), flowFile);
+ specJson = JsonUtils.jsonToObject(resolvedSpec, DEFAULT_CHARSET);
} else {
specJson = null;
}
@@ -372,8 +381,28 @@ public class JoltTransformJSON extends AbstractProcessor {
}
}
- protected FilenameFilter getJarFilenameFilter(){
- return (dir, name) -> (name != null && name.endsWith(".jar"));
+ private String readTransform(final PropertyValue propertyValue, final
FlowFile flowFile) {
+ final String transform;
+
+ if (propertyValue.isExpressionLanguagePresent()) {
+ transform =
propertyValue.evaluateAttributeExpressions(flowFile).getValue();
+ } else {
+ transform = readTransform(propertyValue);
+ }
+
+ return transform;
}
+ private String readTransform(final PropertyValue propertyValue) {
+ final ResourceReference resourceReference = propertyValue.asResource();
+ try (final BufferedReader reader = new BufferedReader(new
InputStreamReader(resourceReference.read()))) {
+ return reader.lines().collect(Collectors.joining());
+ } catch (final IOException e) {
+ throw new UncheckedIOException("Read JOLT Transform failed", e);
+ }
+ }
+
+ protected FilenameFilter getJarFilenameFilter() {
+ return (dir, name) -> (name != null && name.endsWith(".jar"));
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
index d79690c877..3a37d6c698 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
@@ -59,11 +59,29 @@ public class TestJoltTransformJSON {
}
@Test
- public void testInvalidJOLTSpec() {
+ public void testRelationshipsCreatedFromFile() throws IOException{
+ Processor processor= new JoltTransformJSON();
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ final String spec =
"./src/test/resources/TestJoltTransformJson/chainrSpec.json";
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.enqueue(JSON_INPUT);
+ Set<Relationship> relationships = processor.getRelationships();
+ assertTrue(relationships.contains(JoltTransformJSON.REL_FAILURE));
+ assertTrue(relationships.contains(JoltTransformJSON.REL_SUCCESS));
+ assertEquals(2, relationships.size());
+ }
+
+ @Test
+ public void testInvalidJOLTSpec() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new
JoltTransformJSON());
- final String spec = "[{}]";
+ String spec = "[{}]";
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.assertNotValid();
+
+ final String specLocation =
"src/test/resources/TestJoltTransformJson/chainrSpec.json";
+ spec = new String(Files.readAllBytes(Paths.get(specLocation)));
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.assertValid();
}
@Test
@@ -76,7 +94,16 @@ public class TestJoltTransformJSON {
}
@Test
- public void testSpecIsNotSet() {
+ public void testIncorrectJOLTSpecFromFile() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
JoltTransformJSON());
+ final String chainrSpec =
"./src/test/resources/TestJoltTransformJson/chainrSpec.json";
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, chainrSpec);
+ runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,
JoltTransformJSON.SHIFTR);
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testSpecIsNotSet() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new
JoltTransformJSON());
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,
JoltTransformJSON.SHIFTR);
runner.assertNotValid();
@@ -118,6 +145,16 @@ public class TestJoltTransformJSON {
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE);
}
+ @Test
+ public void testInvalidFlowFileContentJsonFromFile() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
JoltTransformJSON());
+ final String spec =
"./src/test/resources/TestJoltTransformJson/chainrSpec.json";
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.enqueue("invalid json");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE);
+ }
+
@Test
public void testCustomTransformationWithNoModule() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new
JoltTransformJSON());
@@ -201,6 +238,23 @@ public class TestJoltTransformJSON {
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}
+ @Test
+ public void testTransformInputWithShiftrFromFile() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
JoltTransformJSON());
+ final String spec =
"./src/test/resources/TestJoltTransformJson/shiftrSpec.json";
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,
JoltTransformJSON.SHIFTR);
+ runner.enqueue(JSON_INPUT);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
+ final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
+ transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
+ Object transformedJson = JsonUtils.jsonToObject(new
ByteArrayInputStream(transformed.toByteArray()));
+ Object compareJson =
JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/shiftrOutput.json")));
+ assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
+ }
+
@Test
public void testTransformInputWithDefaultr() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new
JoltTransformJSON());