Repository: nifi
Updated Branches:
  refs/heads/master b693a4a56 -> ad3d63d20


nifi-1214d Mock Framework should allow order-independent assumptions on 
FlowFiles.

This closes #1033

Signed-off-by: jpercivall <joeperciv...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ad3d63d2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ad3d63d2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ad3d63d2

Branch: refs/heads/master
Commit: ad3d63d204c1e4408c91728d4c5d76a75e14c8f9
Parents: b693a4a
Author: Toivo Adams <toivo.ad...@gmail.com>
Authored: Tue Sep 20 11:35:07 2016 +0300
Committer: jpercivall <joeperciv...@yahoo.com>
Committed: Tue Sep 20 10:25:36 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/util/MockFlowFile.java | 22 ++++++
 .../nifi/util/StandardProcessorTestRunner.java  | 36 +++++++++
 .../java/org/apache/nifi/util/TestRunner.java   | 17 +++++
 .../util/TestStandardProcessorTestRunner.java   | 78 ++++++++++++++++++++
 4 files changed, 153 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ad3d63d2/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index 516c8a4..9848a3d 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 
@@ -290,4 +291,25 @@ public class MockFlowFile implements FlowFileRecord {
     public long getQueueDateIndex() {
         return 0;
     }
+    public boolean isAttributeEqual(final String attributeName, final String 
expectedValue) {
+        // unknown attribute name, so cannot be equal.
+        if (attributes.containsKey(attributeName) == false)
+            return false;
+
+        String value = attributes.get(attributeName);
+        return Objects.equals(expectedValue, value);
+    }
+
+    public boolean isContentEqual(String expected) {
+        return isContentEqual(expected, Charset.forName("UTF-8"));
+    }
+
+    public boolean isContentEqual(String expected, final Charset charset) {
+        final String value = new String(this.data, charset);
+        return Objects.equals(expected, value);
+    }
+
+    public boolean isContentEqual(final byte[] expected) {
+        return Arrays.equals(expected, this.data);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad3d63d2/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 138524d..2a1451a 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -42,6 +42,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@@ -855,4 +857,38 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
         return variableRegistry.removeVariable(new 
VariableDescriptor.Builder(name).build());
     }
+
+    /**
+     * Asserts that all FlowFiles meet all conditions.
+     *
+     * @param relationshipName relationship name
+     * @param predicate conditions
+     */
+    @Override
+    public void assertAllConditionsMet(final String relationshipName, 
Predicate<MockFlowFile> predicate) {
+        assertAllConditionsMet(new 
Relationship.Builder().name(relationshipName).build(), predicate);
+    }
+
+    /**
+     * Asserts that all FlowFiles meet all conditions.
+     *
+     * @param relationship relationship
+     * @param predicate conditions
+     */
+    @Override
+    public void assertAllConditionsMet(final Relationship relationship, 
Predicate<MockFlowFile> predicate) {
+
+        if (predicate==null)
+            Assert.fail("predicate cannot be null");
+
+        final List<MockFlowFile> flowFiles = 
getFlowFilesForRelationship(relationship);
+
+        if (flowFiles.isEmpty())
+            Assert.fail("Relationship " + relationship.getName() + " does not 
contain any FlowFile");
+
+        for (MockFlowFile flowFile : flowFiles) {
+            if (predicate.test(flowFile)==false)
+                Assert.fail("FlowFile " + flowFile + " does not meet all 
condition");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad3d63d2/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 78d4d00..63b7781 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -21,6 +21,7 @@ import java.io.InputStream;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Predicate;
 
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -932,4 +933,20 @@ public interface TestRunner {
      * @throws NullPointerException if the name is null
      */
     String removeVariable(String name);
+
+    /**
+     * Asserts that all FlowFiles meet all conditions.
+     *
+     * @param relationshipName relationship name
+     * @param predicate conditions
+     */
+    void assertAllConditionsMet(final String relationshipName, 
Predicate<MockFlowFile> predicate);
+
+    /**
+     * Asserts that all FlowFiles meet all conditions.
+     *
+     * @param relationship relationship
+     * @param predicate conditions
+     */
+    void assertAllConditionsMet(final Relationship relationship, 
Predicate<MockFlowFile> predicate);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad3d63d2/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
index c65a7ba..c5776d9 100644
--- 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
@@ -20,8 +20,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
+
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -54,6 +58,45 @@ public class TestStandardProcessorTestRunner {
     }
 
     @Test
+    public void testAllConditionsMet() {
+        TestRunner runner = new StandardProcessorTestRunner(new 
GoodProcessor());
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("GROUP_ATTRIBUTE_KEY", "1");
+        attributes.put("KeyB", "hihii");
+        runner.enqueue("1,hello\n1,good-bye".getBytes(), attributes);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GoodProcessor.REL_SUCCESS, 1);
+
+        runner.assertAllConditionsMet("success",
+            mff -> mff.isAttributeEqual("GROUP_ATTRIBUTE_KEY", "1") && 
mff.isContentEqual("1,hello\n1,good-bye")
+        );
+    }
+
+    @Test
+    public void testAllConditionsMetComplex() {
+        TestRunner runner = new StandardProcessorTestRunner(new 
GoodProcessor());
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("GROUP_ATTRIBUTE_KEY", "1");
+        attributes.put("KeyB", "hihii");
+        runner.enqueue("1,hello\n1,good-bye".getBytes(), attributes);
+
+        attributes.clear();
+        attributes.put("age", "34");
+        runner.enqueue("May Andersson".getBytes(), attributes);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GoodProcessor.REL_SUCCESS, 2);
+
+        Predicate<MockFlowFile> firstPredicate = mff -> 
mff.isAttributeEqual("GROUP_ATTRIBUTE_KEY", "1");
+        Predicate<MockFlowFile> either = firstPredicate.or(mff -> 
mff.isAttributeEqual("age", "34"));
+
+        runner.assertAllConditionsMet("success", either);
+    }
+
+    @Test
     public void testNumThreads() {
         final ProcessorWithOnStop proc = new ProcessorWithOnStop();
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -186,4 +229,39 @@ public class TestStandardProcessorTestRunner {
             counter++;
         }
     }
+
+    private static class GoodProcessor extends AbstractProcessor {
+
+        public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+        .name("success")
+        .description("Successfully created FlowFile from ...")
+        .build();
+
+        public static final Relationship REL_FAILURE = new 
Relationship.Builder()
+        .name("failure")
+        .description("... execution failed. Incoming FlowFile will be 
penalized and routed to this relationship")
+        .build();
+
+        private final Set<Relationship> relationships;
+
+        public GoodProcessor() {
+            final Set<Relationship> r = new HashSet<>();
+            r.add(REL_SUCCESS);
+            r.add(REL_FAILURE);
+            relationships = Collections.unmodifiableSet(r);
+        }
+
+        @Override
+        public Set<Relationship> getRelationships() {
+            return relationships;
+        }
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+            for( FlowFile incoming : session.get(20)) {
+                session.transfer(incoming, REL_SUCCESS);
+            }
+        }
+    }
 }

Reply via email to