Repository: nifi
Updated Branches:
  refs/heads/master be6bcf20a -> 1b4729e44


NIFI-3240 - AttributesToJson performance improvements

This closes #1352.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1b4729e4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1b4729e4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1b4729e4

Branch: refs/heads/master
Commit: 1b4729e448394177cddeccb700f9215d6ec7b570
Parents: be6bcf2
Author: Bryan Rosander <[email protected]>
Authored: Wed Dec 21 14:19:57 2016 -0500
Committer: Bryan Bende <[email protected]>
Committed: Tue Jan 3 10:20:16 2017 -0500

----------------------------------------------------------------------
 .../processors/standard/AttributesToJSON.java   | 128 +++++++++----------
 .../standard/TestAttributesToJSON.java          | 111 +++++++++++++++-
 2 files changed, 171 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1b4729e4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
index 78b8d58..cfa4cfe 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
@@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -36,20 +37,19 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.BufferedOutputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
+import java.util.stream.Collectors;
 
 @EventDriven
 @SideEffectFree
@@ -66,7 +66,7 @@ public class AttributesToJSON extends AbstractProcessor {
 
     public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
     public static final String DESTINATION_CONTENT = "flowfile-content";
-    private static final String APPLICATION_JSON = "application/json";
+    public static final String APPLICATION_JSON = "application/json";
 
 
     public static final PropertyDescriptor ATTRIBUTES_LIST = new 
PropertyDescriptor.Builder()
@@ -116,6 +116,10 @@ public class AttributesToJSON extends AbstractProcessor {
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
     private static final ObjectMapper objectMapper = new ObjectMapper();
+    private volatile Set<String> attributesToRemove;
+    private volatile Set<String> attributes;
+    private volatile Boolean nullValueForEmptyString;
+    private volatile boolean destinationContent;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -149,55 +153,57 @@ public class AttributesToJSON extends AbstractProcessor {
      * @return
      *  Map of values that are feed to a Jackson ObjectMapper
      */
-    protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, 
String atrList,
-                                                                boolean 
includeCoreAttributes,
-                                                                boolean 
nullValForEmptyString) {
-
-        Map<String, String> atsToWrite = new HashMap<>();
+    protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, 
Set<String> attributes, Set<String> attributesToRemove, boolean 
nullValForEmptyString) {
+        Map<String, String> result;
+        //If list of attributes specified get only those attributes. Otherwise 
write them all
+        if (attributes != null) {
+            result = new HashMap<>(attributes.size());
+            for (String attribute : attributes) {
+                String val = ff.getAttribute(attribute);
+                if (val != null || nullValForEmptyString) {
+                    result.put(attribute, val);
+                } else {
+                    result.put(attribute, "");
+                }
+            }
+        } else {
+            Map<String, String> ffAttributes = ff.getAttributes();
+            result = new HashMap<>(ffAttributes.size());
+            for (Map.Entry<String, String> e : ffAttributes.entrySet()) {
+                if (!attributesToRemove.contains(e.getKey())) {
+                    result.put(e.getKey(), e.getValue());
+                }
+            }
+        }
+        return result;
+    }
 
+    private Set<String> buildAtrs(String atrList, Set<String> atrsToExclude) {
         //If list of attributes specified get only those attributes. Otherwise 
write them all
         if (StringUtils.isNotBlank(atrList)) {
             String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR);
             if (ats != null) {
+                Set<String> result = new HashSet<>(ats.length);
                 for (String str : ats) {
-                    String cleanStr = str.trim();
-                    String val = ff.getAttribute(cleanStr);
-                    if (val != null) {
-                        atsToWrite.put(cleanStr, val);
-                    } else {
-                        if (nullValForEmptyString) {
-                            atsToWrite.put(cleanStr, null);
-                        } else {
-                            atsToWrite.put(cleanStr, "");
-                        }
+                    String trim = str.trim();
+                    if (!atrsToExclude.contains(trim)) {
+                        result.add(trim);
                     }
                 }
+                return result;
             }
-        } else {
-            atsToWrite.putAll(ff.getAttributes());
         }
-
-        if (!includeCoreAttributes) {
-            atsToWrite = removeCoreAttributes(atsToWrite);
-        }
-
-        return atsToWrite;
+        return null;
     }
 
-    /**
-     * Remove all of the CoreAttributes from the Attributes that will be 
written to the Flowfile.
-     *
-     * @param atsToWrite
-     *  List of Attributes that have already been generated including the 
CoreAttributes
-     *
-     * @return
-     *  Difference of all attributes minus the CoreAttributes
-     */
-    protected Map<String, String> removeCoreAttributes(Map<String, String> 
atsToWrite) {
-        for (CoreAttributes c : CoreAttributes.values()) {
-            atsToWrite.remove(c.key());
-        }
-        return atsToWrite;
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        attributesToRemove = 
context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? 
Collections.EMPTY_SET : Arrays.stream(CoreAttributes.values())
+                .map(CoreAttributes::key)
+                .collect(Collectors.toSet());
+        attributes = 
buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue(), attributesToRemove);
+        nullValueForEmptyString = 
context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
+        destinationContent = 
DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue());
     }
 
     @Override
@@ -207,33 +213,21 @@ public class AttributesToJSON extends AbstractProcessor {
             return;
         }
 
-        final Map<String, String> atrList = 
buildAttributesMapForFlowFile(original,
-                context.getProperty(ATTRIBUTES_LIST).getValue(),
-                context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(),
-                context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean());
+        final Map<String, String> atrList = 
buildAttributesMapForFlowFile(original, attributes, attributesToRemove, 
nullValueForEmptyString);
 
         try {
-
-            switch (context.getProperty(DESTINATION).getValue()) {
-                case DESTINATION_ATTRIBUTE:
-                    FlowFile atFlowfile = session.putAttribute(original, 
JSON_ATTRIBUTE_NAME,
-                            objectMapper.writeValueAsString(atrList));
-                    session.transfer(atFlowfile, REL_SUCCESS);
-                    break;
-                case DESTINATION_CONTENT:
-                    FlowFile conFlowfile = session.write(original, new 
StreamCallback() {
-                        @Override
-                        public void process(InputStream in, OutputStream out) 
throws IOException {
-                            try (OutputStream outputStream = new 
BufferedOutputStream(out)) {
-                                
outputStream.write(objectMapper.writeValueAsBytes(atrList));
-                            }
-                        }
-                    });
-                    conFlowfile = session.putAttribute(conFlowfile, 
CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
-                    session.transfer(conFlowfile, REL_SUCCESS);
-                    break;
+            if (destinationContent) {
+                FlowFile conFlowfile = session.write(original, (in, out) -> {
+                    try (OutputStream outputStream = new 
BufferedOutputStream(out)) {
+                        
outputStream.write(objectMapper.writeValueAsBytes(atrList));
+                    }
+                });
+                conFlowfile = session.putAttribute(conFlowfile, 
CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
+                session.transfer(conFlowfile, REL_SUCCESS);
+            } else {
+                FlowFile atFlowfile = session.putAttribute(original, 
JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atrList));
+                session.transfer(atFlowfile, REL_SUCCESS);
             }
-
         } catch (JsonProcessingException e) {
             getLogger().error(e.getMessage());
             session.transfer(original, REL_FAILURE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/1b4729e4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
index 0f9ec26..5c8df9b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
@@ -19,13 +19,20 @@ package org.apache.nifi.processors.standard;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -193,7 +200,6 @@ public class TestAttributesToJSON {
         
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).assertContentEquals("{}");
     }
 
-
     @Test
     public void testAttribute_singleUserDefinedAttribute() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new 
AttributesToJSON());
@@ -279,4 +285,107 @@ public class TestAttributesToJSON {
         assertTrue(val.size() == 1);
     }
 
+    @Test
+    public void testAttribute_noIncludeCoreAttributesUserDefined() throws 
IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + 
TEST_ATTRIBUTE_KEY + " , " + CoreAttributes.PATH.key() + " ");
+        testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, 
"false");
+
+        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, 
TEST_ATTRIBUTE_VALUE);
+        ff = session.putAttribute(ff, CoreAttributes.PATH.key(), 
TEST_ATTRIBUTE_VALUE);
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
+                .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        String json = 
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+        assertEquals(TEST_ATTRIBUTE_VALUE, val.get(TEST_ATTRIBUTE_KEY));
+        assertEquals(1, val.size());
+    }
+
+    @Test
+    public void testAttribute_noIncludeCoreAttributesContent() throws 
IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, 
"false");
+        testRunner.setProperty(AttributesToJSON.DESTINATION, 
AttributesToJSON.DESTINATION_CONTENT);
+
+        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, 
TEST_ATTRIBUTE_VALUE);
+        ff = session.putAttribute(ff, CoreAttributes.PATH.key(), 
TEST_ATTRIBUTE_VALUE);
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = 
mapper.readValue(testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).toByteArray(),
 HashMap.class);
+        assertEquals(TEST_ATTRIBUTE_VALUE, val.get(TEST_ATTRIBUTE_KEY));
+        assertEquals(1, val.size());
+    }
+
+    @Test
+    public void testAttribute_includeCoreAttributesContent() throws 
IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.DESTINATION, 
AttributesToJSON.DESTINATION_CONTENT);
+        testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, 
"true");
+
+        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        List<MockFlowFile> flowFilesForRelationship = 
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS);
+
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+        assertEquals(AttributesToJSON.APPLICATION_JSON, 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+        Map<String, String> val = new 
ObjectMapper().readValue(flowFile.toByteArray(), HashMap.class);
+        assertEquals(3, val.size());
+        Set<String> coreAttributes = 
Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet());
+        val.keySet().forEach(k -> assertTrue(coreAttributes.contains(k)));
+    }
+
+    @Test
+    public void testAttribute_includeCoreAttributesAttribute() throws 
IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, 
"true");
+
+        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        List<MockFlowFile> flowFilesForRelationship = 
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS);
+
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+        assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+        Map<String, String> val = new 
ObjectMapper().readValue(flowFile.getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME),
 HashMap.class);
+        assertEquals(3, val.size());
+        Set<String> coreAttributes = 
Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet());
+        val.keySet().forEach(k -> assertTrue(coreAttributes.contains(k)));
+    }
 }

Reply via email to