Repository: nifi Updated Branches: refs/heads/master 270944ec6 -> fa390112c
NIFI-1213 Added the possibility to register FlowFile assertions in mock framework This closes #404 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fa390112 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fa390112 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fa390112 Branch: refs/heads/master Commit: fa390112cec29813921d1818623904b01604beb1 Parents: 270944e Author: Pierre Villard <[email protected]> Authored: Mon May 2 19:54:53 2016 +0200 Committer: Oleg Zhurakousky <[email protected]> Committed: Mon May 16 18:58:15 2016 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/util/FlowFileValidator.java | 30 +++++++ .../apache/nifi/util/MockProcessSession.java | 33 ++++++++ .../nifi/util/StandardProcessorTestRunner.java | 34 ++++++++ .../java/org/apache/nifi/util/TestRunner.java | 33 ++++++++ .../util/TestStandardProcessorTestRunner.java | 89 ++++++++++++++++++++ 5 files changed, 219 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fa390112/nifi-mock/src/main/java/org/apache/nifi/util/FlowFileValidator.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/FlowFileValidator.java b/nifi-mock/src/main/java/org/apache/nifi/util/FlowFileValidator.java new file mode 100644 index 0000000..aacc3cb --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/FlowFileValidator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.flowfile.FlowFile; + +public interface FlowFileValidator { + + /** + * Define a verification method to validate the given FlowFile + * + * @param f Flow file + */ + void assertFlowFile(FlowFile f); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fa390112/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index ea45dbf..2b22761 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -988,6 +988,39 @@ public class MockProcessSession implements ProcessSession { } /** + * Asserts that all FlowFiles that were transferred are compliant with the + * given validator. + * + * @param validator validator to use + */ + public void assertAllFlowFiles(FlowFileValidator validator) { + for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) { + final List<MockFlowFile> flowFiles = entry.getValue(); + for (MockFlowFile mockFlowFile : flowFiles) { + validator.assertFlowFile(mockFlowFile); + } + } + } + + /** + * Asserts that all FlowFiles that were transferred in the given relationship + * are compliant with the given validator. + * + * @param validator validator to use + */ + public void assertAllFlowFiles(Relationship relationship, FlowFileValidator validator) { + for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) { + final List<MockFlowFile> flowFiles = entry.getValue(); + final Relationship rel = entry.getKey(); + for (MockFlowFile mockFlowFile : flowFiles) { + if(rel.equals(relationship)) { + validator.assertFlowFile(mockFlowFile); + } + } + } + } + + /** * Removes all state information about FlowFiles that have been transferred */ public void clearTransferState() { http://git-wip-us.apache.org/repos/asf/nifi/blob/fa390112/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 5aa6d43..edc1f06 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -320,6 +320,40 @@ public class StandardProcessorTestRunner implements TestRunner { } @Override + public void assertAllFlowFilesContainAttribute(String attributeName) { + assertAllFlowFiles(new FlowFileValidator() { + @Override + public void assertFlowFile(FlowFile f) { + Assert.assertTrue(f.getAttribute(attributeName) != null); + } + }); + } + + @Override + public void assertAllFlowFilesContainAttribute(Relationship relationship, String attributeName) { + assertAllFlowFiles(relationship, new FlowFileValidator() { + @Override + public void assertFlowFile(FlowFile f) { + Assert.assertTrue(f.getAttribute(attributeName) != null); + } + }); + } + + @Override + public void assertAllFlowFiles(FlowFileValidator validator) { + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + session.assertAllFlowFiles(validator); + } + } + + @Override + public void assertAllFlowFiles(Relationship relationship, FlowFileValidator validator) { + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + session.assertAllFlowFiles(relationship, validator); + } + } + + @Override public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) { assertAllFlowFilesTransferred(relationship); assertTransferCount(relationship, count); http://git-wip-us.apache.org/repos/asf/nifi/blob/fa390112/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index d1211ef..5832c2e 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -273,6 +273,39 @@ public interface TestRunner { void assertAllFlowFilesTransferred(Relationship relationship, int count); /** + * Asserts that all FlowFiles that were transferred contain the given + * attribute. + * + * @param attributeName attribute to look for + */ + void assertAllFlowFilesContainAttribute(String attributeName); + + /** + * Asserts that all FlowFiles that were transferred to the given + * relationship contain the given attribute. + * + * @param relationship relationship to check + * @param attributeName attribute to look for + */ + void assertAllFlowFilesContainAttribute(Relationship relationship, String attributeName); + + /** + * Asserts that all FlowFiles that were transferred are compliant with the + * given validator. + * + * @param validator validator to use + */ + void assertAllFlowFiles(FlowFileValidator validator); + + /** + * Asserts that all FlowFiles that were transferred in the given relationship + * are compliant with the given validator. + * + * @param validator validator to use + */ + void assertAllFlowFiles(Relationship relationship, FlowFileValidator validator); + + /** * Assert that the number of FlowFiles transferred to the given relationship * is equal to the given count * http://git-wip-us.apache.org/repos/asf/nifi/blob/fa390112/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java index 2ac908f..342b016 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java @@ -18,10 +18,16 @@ package org.apache.nifi.util; import static org.junit.Assert.assertEquals; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.junit.Ignore; import org.junit.Test; @@ -56,6 +62,55 @@ public class TestStandardProcessorTestRunner { assertEquals(5, runner.getProcessContext().getMaxConcurrentTasks()); } + @Test + public void testFlowFileValidator() { + final AddAttributeProcessor proc = new AddAttributeProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + runner.run(5, true); + runner.assertTransferCount(AddAttributeProcessor.REL_SUCCESS, 3); + runner.assertTransferCount(AddAttributeProcessor.REL_FAILURE, 2); + runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.REL_SUCCESS, AddAttributeProcessor.KEY); + runner.assertAllFlowFiles(AddAttributeProcessor.REL_SUCCESS, new FlowFileValidator() { + @Override + public void assertFlowFile(FlowFile f) { + assertEquals("value", f.getAttribute(AddAttributeProcessor.KEY)); + } + }); + } + + @Test(expected = AssertionError.class) + public void testFailFlowFileValidator() { + final AddAttributeProcessor proc = new AddAttributeProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + runner.run(5, true); + runner.assertAllFlowFiles(new FlowFileValidator() { + @Override + public void assertFlowFile(FlowFile f) { + assertEquals("value", f.getAttribute(AddAttributeProcessor.KEY)); + } + }); + } + + @Test(expected = AssertionError.class) + public void testFailAllFlowFilesContainAttribute() { + final AddAttributeProcessor proc = new AddAttributeProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + runner.run(5, true); + runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY); + } + + @Test + public void testAllFlowFilesContainAttribute() { + final AddAttributeProcessor proc = new AddAttributeProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + runner.run(1, true); + runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY); + } + @Test(expected = AssertionError.class) @Ignore("This should not be enabled until we actually fail processor unit tests for using deprecated methods") public void testFailOnDeprecatedTypeAnnotation() { @@ -150,4 +205,38 @@ public class TestStandardProcessorTestRunner { } } + + private static class AddAttributeProcessor extends AbstractProcessor { + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("success").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("failure").build(); + public static final String KEY = "KEY"; + + private Set<Relationship> relationships; + private int counter = 0; + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile ff = session.create(); + if(counter % 2 == 0) { + ff = session.putAttribute(ff, KEY, "value"); + session.transfer(ff, REL_SUCCESS); + } else { + session.transfer(ff, REL_FAILURE); + } + counter++; + } + } }
