NIFI-1079 Create a new Attribute from the existing FlowFile attributes by taking either all of the existing attributes or a user defined list. The existing Attributes are converted to JSON and placed in a new Attribute on the existing FlowFile as Attribute âJSONAttributesâ
Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/19b7a4cc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/19b7a4cc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/19b7a4cc Branch: refs/heads/NIFI-730 Commit: 19b7a4cc7de9da3366aa9c1a15b97c683242b4d0 Parents: 5cc2b04 Author: Jeremy Dyer <[email protected]> Authored: Wed Oct 28 16:20:48 2015 -0400 Committer: Bryan Bende <[email protected]> Committed: Fri Oct 30 14:29:29 2015 -0400 ---------------------------------------------------------------------- .../processors/standard/AttributesToJSON.java | 117 ++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../standard/TestAttributesToJSON.java | 177 +++++++++++++++++++ 3 files changed, 295 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/19b7a4cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java new file mode 100644 index 0000000..7098b6e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java @@ -0,0 +1,117 @@ +package org.apache.nifi.processors.standard; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.*; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.*; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"JSON", "attributes"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Evaluates the attributes from a FlowFile and generates a JSON string with the attribute key/value pair. " + + "The resulting JSON string is placed in the FlowFile as a new Attribute with the name 'JSONAttributes'. By default all" + + "Attributes in the FlowFile are placed in the resulting JSON string. If only certain Attributes are desired you may" + + "specify a list of FlowFile Attributes that you want in the resulting JSON string") +@WritesAttribute(attribute = "JSONAttributes", description = "JSON string of all the pre-existing attributes in the flowfile") +public class AttributesToJSON extends AbstractProcessor { + + public static final String JSON_ATTRIBUTE_NAME = "JSONAttribute"; + private static final String AT_LIST_SEPARATOR = ","; + private static final String DEFAULT_VALUE_IF_ATT_NOT_PRESENT = ""; + + public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() + .name("Attributes List") + .description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' attribute. This list of attributes is case sensitive. If a specified attribute is not found" + + "in the flowfile an empty string will be output for that attritbute in the resulting JSON") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("'" + JSON_ATTRIBUTE_NAME + "' attribute has been successfully added to the flowfile").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Failed to add '" + JSON_ATTRIBUTE_NAME + "' attribute to the flowfile").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private static final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(ATTRIBUTES_LIST); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final String atList = context.getProperty(ATTRIBUTES_LIST).getValue(); + Map<String, String> atsToWrite = null; + + //If list of attributes specified get only those attributes. Otherwise write them all + if (atList != null && !StringUtils.isEmpty(atList)) { + atsToWrite = new HashMap<>(); + String[] ats = StringUtils.split(atList, AT_LIST_SEPARATOR); + if (ats != null) { + for (String str : ats) { + String cleanStr = str.trim(); + String val = original.getAttribute(cleanStr); + if (val != null) { + atsToWrite.put(cleanStr, val); + } else { + atsToWrite.put(cleanStr, DEFAULT_VALUE_IF_ATT_NOT_PRESENT); + } + } + } + } else { + atsToWrite = original.getAttributes(); + } + + if (atsToWrite != null) { + if (atsToWrite.size() == 0) { + getLogger().debug("Flowfile contains no attributes to convert to JSON"); + } else { + try { + FlowFile updated = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atsToWrite)); + session.transfer(updated, REL_SUCCESS); + } catch (JsonProcessingException e) { + getLogger().error(e.getMessage()); + session.transfer(original, REL_FAILURE); + } + } + } + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/19b7a4cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index b12fb6f..8507c96 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +org.apache.nifi.processors.standard.AttributesToJSON org.apache.nifi.processors.standard.Base64EncodeContent org.apache.nifi.processors.standard.CompressContent org.apache.nifi.processors.standard.ControlRate http://git-wip-us.apache.org/repos/asf/nifi/blob/19b7a4cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java new file mode 100644 index 0000000..a057d15 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java @@ -0,0 +1,177 @@ +package org.apache.nifi.processors.standard; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertTrue; + + +public class TestAttributesToJSON { + + private static Logger LOGGER; + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.AttributesToJSON", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestAttributesToJSON", "debug"); + LOGGER = LoggerFactory.getLogger(TestAttributesToJSON.class); + } + + private static final String TEST_ATTRIBUTE_KEY = "TestAttribute"; + private static final String TEST_ATTRIBUTE_VALUE = "TestValue"; + + @Test(expected = AssertionError.class) + public void testInvalidUserSuppliedAttributeList() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + + //Attribute list CANNOT be empty + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, ""); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + } + + @Test + public void testInvalidJSONValueInAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + //Create attribute that contains an invalid JSON Character + ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, "'badjson'"); + + testRunner.enqueue(ff); + testRunner.run(); + + //Expecting success transition because Jackson is taking care of escaping the bad JSON characters + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0). + assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + } + + + @Test + public void testAttribuets_emptyListUserSpecifiedAttributes() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0). + assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + + String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS) + .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME); + + ObjectMapper mapper = new ObjectMapper(); + Map<String, String> val = mapper.readValue(json, HashMap.class); + assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE)); + } + + + @Test + public void testAttribute_singleUserDefinedAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, TEST_ATTRIBUTE_KEY); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0). + assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + + String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS) + .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME); + + ObjectMapper mapper = new ObjectMapper(); + Map<String, String> val = mapper.readValue(json, HashMap.class); + assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE)); + assertTrue(val.size() == 1); + } + + + @Test + public void testAttribute_singleUserDefinedAttributeWithWhiteSpace() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + TEST_ATTRIBUTE_KEY + " "); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0). + assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + + String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS) + .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME); + + ObjectMapper mapper = new ObjectMapper(); + Map<String, String> val = mapper.readValue(json, HashMap.class); + assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE)); + assertTrue(val.size() == 1); + } + + + @Test + public void testAttribute_singleNonExistingUserDefinedAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "NonExistingAttribute"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0). + assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + + String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS) + .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME); + + ObjectMapper mapper = new ObjectMapper(); + Map<String, String> val = mapper.readValue(json, HashMap.class); + + //If a Attribute is requested but does not exist then it is placed in the JSON with an empty string + assertTrue(val.get("NonExistingAttribute").equals("")); + assertTrue(val.size() == 1); + } + +}
