Repository: nifi
Updated Branches:
  refs/heads/master 270944ec6 -> fa390112c


NIFI-1213 Added the possibility to register FlowFile assertions in mock 
framework
This closes #404


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fa390112
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fa390112
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fa390112

Branch: refs/heads/master
Commit: fa390112cec29813921d1818623904b01604beb1
Parents: 270944e
Author: Pierre Villard <[email protected]>
Authored: Mon May 2 19:54:53 2016 +0200
Committer: Oleg Zhurakousky <[email protected]>
Committed: Mon May 16 18:58:15 2016 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/util/FlowFileValidator.java | 30 +++++++
 .../apache/nifi/util/MockProcessSession.java    | 33 ++++++++
 .../nifi/util/StandardProcessorTestRunner.java  | 34 ++++++++
 .../java/org/apache/nifi/util/TestRunner.java   | 33 ++++++++
 .../util/TestStandardProcessorTestRunner.java   | 89 ++++++++++++++++++++
 5 files changed, 219 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fa390112/nifi-mock/src/main/java/org/apache/nifi/util/FlowFileValidator.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/FlowFileValidator.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/FlowFileValidator.java
new file mode 100644
index 0000000..aacc3cb
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/FlowFileValidator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public interface FlowFileValidator {
+
+    /**
+     * Define a verification method to validate the given FlowFile
+     *
+     * @param f Flow file
+     */
+    void assertFlowFile(FlowFile f);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fa390112/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index ea45dbf..2b22761 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -988,6 +988,39 @@ public class MockProcessSession implements ProcessSession {
     }
 
     /**
+     * Asserts that all FlowFiles that were transferred are compliant with the
+     * given validator.
+     *
+     * @param validator validator to use
+     */
+    public void assertAllFlowFiles(FlowFileValidator validator) {
+        for (final Map.Entry<Relationship, List<MockFlowFile>> entry : 
transferMap.entrySet()) {
+            final List<MockFlowFile> flowFiles = entry.getValue();
+            for (MockFlowFile mockFlowFile : flowFiles) {
+                validator.assertFlowFile(mockFlowFile);
+            }
+        }
+    }
+
+    /**
+     * Asserts that all FlowFiles that were transferred in the given 
relationship
+     * are compliant with the given validator.
+     *
+     * @param validator validator to use
+     */
+    public void assertAllFlowFiles(Relationship relationship, 
FlowFileValidator validator) {
+        for (final Map.Entry<Relationship, List<MockFlowFile>> entry : 
transferMap.entrySet()) {
+            final List<MockFlowFile> flowFiles = entry.getValue();
+            final Relationship rel = entry.getKey();
+            for (MockFlowFile mockFlowFile : flowFiles) {
+                if(rel.equals(relationship)) {
+                    validator.assertFlowFile(mockFlowFile);
+                }
+            }
+        }
+    }
+
+    /**
      * Removes all state information about FlowFiles that have been transferred
      */
     public void clearTransferState() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fa390112/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 5aa6d43..edc1f06 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -320,6 +320,40 @@ public class StandardProcessorTestRunner implements 
TestRunner {
     }
 
     @Override
+    public void assertAllFlowFilesContainAttribute(String attributeName) {
+        assertAllFlowFiles(new FlowFileValidator() {
+            @Override
+            public void assertFlowFile(FlowFile f) {
+                Assert.assertTrue(f.getAttribute(attributeName) != null);
+            }
+        });
+    }
+
+    @Override
+    public void assertAllFlowFilesContainAttribute(Relationship relationship, 
String attributeName) {
+        assertAllFlowFiles(relationship, new FlowFileValidator() {
+            @Override
+            public void assertFlowFile(FlowFile f) {
+                Assert.assertTrue(f.getAttribute(attributeName) != null);
+            }
+        });
+    }
+
+    @Override
+    public void assertAllFlowFiles(FlowFileValidator validator) {
+        for (final MockProcessSession session : 
sessionFactory.getCreatedSessions()) {
+            session.assertAllFlowFiles(validator);
+        }
+    }
+
+    @Override
+    public void assertAllFlowFiles(Relationship relationship, 
FlowFileValidator validator) {
+        for (final MockProcessSession session : 
sessionFactory.getCreatedSessions()) {
+            session.assertAllFlowFiles(relationship, validator);
+        }
+    }
+
+    @Override
     public void assertAllFlowFilesTransferred(final Relationship relationship, 
final int count) {
         assertAllFlowFilesTransferred(relationship);
         assertTransferCount(relationship, count);

http://git-wip-us.apache.org/repos/asf/nifi/blob/fa390112/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index d1211ef..5832c2e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -273,6 +273,39 @@ public interface TestRunner {
     void assertAllFlowFilesTransferred(Relationship relationship, int count);
 
     /**
+     * Asserts that all FlowFiles that were transferred contain the given
+     * attribute.
+     *
+     * @param attributeName attribute to look for
+     */
+    void assertAllFlowFilesContainAttribute(String attributeName);
+
+    /**
+     * Asserts that all FlowFiles that were transferred to the given
+     * relationship contain the given attribute.
+     *
+     * @param relationship relationship to check
+     * @param attributeName attribute to look for
+     */
+    void assertAllFlowFilesContainAttribute(Relationship relationship, String 
attributeName);
+
+    /**
+     * Asserts that all FlowFiles that were transferred are compliant with the
+     * given validator.
+     *
+     * @param validator validator to use
+     */
+    void assertAllFlowFiles(FlowFileValidator validator);
+
+    /**
+     * Asserts that all FlowFiles that were transferred in the given 
relationship
+     * are compliant with the given validator.
+     *
+     * @param validator validator to use
+     */
+    void assertAllFlowFiles(Relationship relationship, FlowFileValidator 
validator);
+
+    /**
      * Assert that the number of FlowFiles transferred to the given 
relationship
      * is equal to the given count
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/fa390112/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
index 2ac908f..342b016 100644
--- 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
@@ -18,10 +18,16 @@ package org.apache.nifi.util;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -56,6 +62,55 @@ public class TestStandardProcessorTestRunner {
         assertEquals(5, runner.getProcessContext().getMaxConcurrentTasks());
     }
 
+    @Test
+    public void testFlowFileValidator() {
+        final AddAttributeProcessor proc = new AddAttributeProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        runner.run(5, true);
+        runner.assertTransferCount(AddAttributeProcessor.REL_SUCCESS, 3);
+        runner.assertTransferCount(AddAttributeProcessor.REL_FAILURE, 2);
+        
runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.REL_SUCCESS, 
AddAttributeProcessor.KEY);
+        runner.assertAllFlowFiles(AddAttributeProcessor.REL_SUCCESS, new 
FlowFileValidator() {
+            @Override
+            public void assertFlowFile(FlowFile f) {
+                assertEquals("value", 
f.getAttribute(AddAttributeProcessor.KEY));
+            }
+        });
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testFailFlowFileValidator() {
+        final AddAttributeProcessor proc = new AddAttributeProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        runner.run(5, true);
+        runner.assertAllFlowFiles(new FlowFileValidator() {
+            @Override
+            public void assertFlowFile(FlowFile f) {
+                assertEquals("value", 
f.getAttribute(AddAttributeProcessor.KEY));
+            }
+        });
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testFailAllFlowFilesContainAttribute() {
+        final AddAttributeProcessor proc = new AddAttributeProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        runner.run(5, true);
+        runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY);
+    }
+
+    @Test
+    public void testAllFlowFilesContainAttribute() {
+        final AddAttributeProcessor proc = new AddAttributeProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        runner.run(1, true);
+        runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY);
+    }
+
     @Test(expected = AssertionError.class)
     @Ignore("This should not be enabled until we actually fail processor unit 
tests for using deprecated methods")
     public void testFailOnDeprecatedTypeAnnotation() {
@@ -150,4 +205,38 @@ public class TestStandardProcessorTestRunner {
         }
 
     }
+
+    private static class AddAttributeProcessor extends AbstractProcessor {
+        public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("success").build();
+        public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("failure").build();
+        public static final String KEY = "KEY";
+
+        private Set<Relationship> relationships;
+        private int counter = 0;
+
+        @Override
+        protected void init(final ProcessorInitializationContext context) {
+            final Set<Relationship> relationships = new HashSet<>();
+            relationships.add(REL_SUCCESS);
+            relationships.add(REL_FAILURE);
+            this.relationships = Collections.unmodifiableSet(relationships);
+        }
+
+        @Override
+        public Set<Relationship> getRelationships() {
+            return relationships;
+        }
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+            FlowFile ff = session.create();
+            if(counter % 2 == 0) {
+                ff = session.putAttribute(ff, KEY, "value");
+                session.transfer(ff, REL_SUCCESS);
+            } else {
+                session.transfer(ff, REL_FAILURE);
+            }
+            counter++;
+        }
+    }
 }

Reply via email to