NIFI-1079 Create a new Attribute from the existing FlowFile attributes by taking
either all of the existing attributes or a user defined list. The
existing Attributes are converted to JSON and placed in a new Attribute
on the existing FlowFile as Attribute “JSONAttributes”

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/19b7a4cc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/19b7a4cc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/19b7a4cc

Branch: refs/heads/NIFI-730
Commit: 19b7a4cc7de9da3366aa9c1a15b97c683242b4d0
Parents: 5cc2b04
Author: Jeremy Dyer <[email protected]>
Authored: Wed Oct 28 16:20:48 2015 -0400
Committer: Bryan Bende <[email protected]>
Committed: Fri Oct 30 14:29:29 2015 -0400

----------------------------------------------------------------------
 .../processors/standard/AttributesToJSON.java   | 117 ++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../standard/TestAttributesToJSON.java          | 177 +++++++++++++++++++
 3 files changed, 295 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/19b7a4cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
new file mode 100644
index 0000000..7098b6e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
@@ -0,0 +1,117 @@
+package org.apache.nifi.processors.standard;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.*;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"JSON", "attributes"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Evaluates the attributes from a FlowFile and generates 
a JSON string with the attribute key/value pair. " +
+        "The resulting JSON string is placed in the FlowFile as a new 
Attribute with the name 'JSONAttributes'. By default all" +
+        "Attributes in the FlowFile are placed in the resulting JSON string. 
If only certain Attributes are desired you may" +
+        "specify a list of FlowFile Attributes that you want in the resulting 
JSON string")
+@WritesAttribute(attribute = "JSONAttributes", description = "JSON string of 
all the pre-existing attributes in the flowfile")
+public class AttributesToJSON extends AbstractProcessor {
+
+    public static final String JSON_ATTRIBUTE_NAME = "JSONAttribute";
+    private static final String AT_LIST_SEPARATOR = ",";
+    private static final String DEFAULT_VALUE_IF_ATT_NOT_PRESENT = "";
+
+    public static final PropertyDescriptor ATTRIBUTES_LIST = new 
PropertyDescriptor.Builder()
+            .name("Attributes List")
+            .description("Comma separated list of attributes to be included in 
the '" + JSON_ATTRIBUTE_NAME +"' attribute. This list of attributes is case 
sensitive. If a specified attribute is not found" +
+                    "in the flowfile an empty string will be output for that 
attritbute in the resulting JSON")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("'" + JSON_ATTRIBUTE_NAME + "' attribute has been 
successfully added to the flowfile").build();
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("Failed to add '" + JSON_ATTRIBUTE_NAME + "' 
attribute to the flowfile").build();
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(ATTRIBUTES_LIST);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final String atList = context.getProperty(ATTRIBUTES_LIST).getValue();
+        Map<String, String> atsToWrite = null;
+
+        //If list of attributes specified get only those attributes. Otherwise 
write them all
+        if (atList != null && !StringUtils.isEmpty(atList)) {
+            atsToWrite = new HashMap<>();
+            String[] ats = StringUtils.split(atList, AT_LIST_SEPARATOR);
+            if (ats != null) {
+                for (String str : ats) {
+                    String cleanStr = str.trim();
+                    String val = original.getAttribute(cleanStr);
+                    if (val != null) {
+                        atsToWrite.put(cleanStr, val);
+                    } else {
+                        atsToWrite.put(cleanStr, 
DEFAULT_VALUE_IF_ATT_NOT_PRESENT);
+                    }
+                }
+            }
+        } else {
+            atsToWrite = original.getAttributes();
+        }
+
+        if (atsToWrite != null) {
+            if (atsToWrite.size() == 0) {
+                getLogger().debug("Flowfile contains no attributes to convert 
to JSON");
+            } else {
+                try {
+                    FlowFile updated = session.putAttribute(original, 
JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atsToWrite));
+                    session.transfer(updated, REL_SUCCESS);
+                } catch (JsonProcessingException e) {
+                    getLogger().error(e.getMessage());
+                    session.transfer(original, REL_FAILURE);
+                }
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/19b7a4cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index b12fb6f..8507c96 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+org.apache.nifi.processors.standard.AttributesToJSON
 org.apache.nifi.processors.standard.Base64EncodeContent
 org.apache.nifi.processors.standard.CompressContent
 org.apache.nifi.processors.standard.ControlRate

http://git-wip-us.apache.org/repos/asf/nifi/blob/19b7a4cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
new file mode 100644
index 0000000..a057d15
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
@@ -0,0 +1,177 @@
+package org.apache.nifi.processors.standard;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class TestAttributesToJSON {
+
+    private static Logger LOGGER;
+
+    static {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.AttributesToJSON",
 "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestAttributesToJSON",
 "debug");
+        LOGGER = LoggerFactory.getLogger(TestAttributesToJSON.class);
+    }
+
+    private static final String TEST_ATTRIBUTE_KEY = "TestAttribute";
+    private static final String TEST_ATTRIBUTE_VALUE = "TestValue";
+
+    @Test(expected = AssertionError.class)
+    public void testInvalidUserSuppliedAttributeList() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
AttributesToJSON());
+
+        //Attribute list CANNOT be empty
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "");
+
+        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+    }
+
+    @Test
+    public void testInvalidJSONValueInAttribute() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
AttributesToJSON());
+
+        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        //Create attribute that contains an invalid JSON Character
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, "'badjson'");
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        //Expecting success transition because Jackson is taking care of 
escaping the bad JSON characters
+        
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+    }
+
+
+    @Test
+    public void testAttribuets_emptyListUserSpecifiedAttributes() throws 
Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
AttributesToJSON());
+
+        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, 
TEST_ATTRIBUTE_VALUE);
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        String json = 
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+        assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE));
+    }
+
+
+    @Test
+    public void testAttribute_singleUserDefinedAttribute() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, 
TEST_ATTRIBUTE_KEY);
+
+        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, 
TEST_ATTRIBUTE_VALUE);
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        String json = 
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+        assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE));
+        assertTrue(val.size() == 1);
+    }
+
+
+    @Test
+    public void testAttribute_singleUserDefinedAttributeWithWhiteSpace() 
throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + 
TEST_ATTRIBUTE_KEY + " ");
+
+        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, 
TEST_ATTRIBUTE_VALUE);
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        String json = 
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+        assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE));
+        assertTrue(val.size() == 1);
+    }
+
+
+    @Test
+    public void testAttribute_singleNonExistingUserDefinedAttribute() throws 
Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, 
"NonExistingAttribute");
+
+        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, 
TEST_ATTRIBUTE_VALUE);
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        String json = 
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+
+        //If a Attribute is requested but does not exist then it is placed in 
the JSON with an empty string
+        assertTrue(val.get("NonExistingAttribute").equals(""));
+        assertTrue(val.size() == 1);
+    }
+
+}

Reply via email to