Repository: nifi Updated Branches: refs/heads/master 2673370cb -> eb6f9f0fe
NIFI-1423 Allow to penalize FlowFiles that are routed to No Retry relationship This closes #183 Signed-off-by: jpercivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/eb6f9f0f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/eb6f9f0f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/eb6f9f0f Branch: refs/heads/master Commit: eb6f9f0fec3042350864afee17a67ea62ef5f37a Parents: 2673370 Author: ledor473 <[email protected]> Authored: Thu Jan 21 17:15:34 2016 -0500 Committer: jpercivall <[email protected]> Committed: Mon Feb 8 11:20:08 2016 -0500 ---------------------------------------------------------------------- .../apache/nifi/util/MockProcessSession.java | 34 +++++++++-- .../nifi/util/StandardProcessorTestRunner.java | 22 +++++++ .../java/org/apache/nifi/util/TestRunner.java | 15 +++++ .../nifi/processors/standard/InvokeHTTP.java | 14 ++++- .../processors/standard/TestInvokeHTTP.java | 2 + .../standard/util/TestInvokeHttpCommon.java | 62 +++++++++++++++++++- 6 files changed, 141 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 67cf16e..ea45dbf 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -61,6 +61,7 @@ public class MockProcessSession implements ProcessSession { private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>(); private final MockFlowFileQueue processorQueue; private final Set<Long> beingProcessed = new HashSet<>(); + private final List<MockFlowFile> penalized = new ArrayList<>(); private final Map<Long, MockFlowFile> currentVersions = new HashMap<>(); private final Map<Long, MockFlowFile> originalVersions = new HashMap<>(); @@ -429,11 +430,22 @@ public class MockProcessSession implements ProcessSession { @Override public void remove(final FlowFile flowFile) { validateState(flowFile); - final Iterator<Long> itr = beingProcessed.iterator(); - while (itr.hasNext()) { - final Long ffId = itr.next(); + + final Iterator<MockFlowFile> penalizedItr = penalized.iterator(); + while (penalizedItr.hasNext()) { + final MockFlowFile ff = penalizedItr.next(); + if (Objects.equals(ff.getId(), flowFile.getId())) { + penalizedItr.remove(); + penalized.remove(ff); + break; + } + } + + final Iterator<Long> processedItr = beingProcessed.iterator(); + while (processedItr.hasNext()) { + final Long ffId = processedItr.next(); if (ffId != null && ffId.equals(flowFile.getId())) { - itr.remove(); + processedItr.remove(); beingProcessed.remove(ffId); removedCount++; currentVersions.remove(ffId); @@ -522,6 +534,9 @@ public class MockProcessSession implements ProcessSession { for (final List<MockFlowFile> list : transferMap.values()) { for (final MockFlowFile flowFile : list) { processorQueue.offer(flowFile); + if (penalize) { + penalized.add(flowFile); + } } } @@ -529,6 +544,9 @@ public class MockProcessSession implements ProcessSession { final MockFlowFile flowFile = originalVersions.get(flowFileId); if (flowFile != null) { processorQueue.offer(flowFile); + if (penalize) { + penalized.add(flowFile); + } } } @@ -538,6 +556,9 @@ public class MockProcessSession implements ProcessSession { originalVersions.clear(); transferMap.clear(); clearTransferState(); + if (!penalize) { + penalized.clear(); + } } @Override @@ -696,6 +717,10 @@ public class MockProcessSession implements ProcessSession { return list; } + public List<MockFlowFile> getPenalizedFlowFiles() { + return penalized; + } + /** * @param relationship to get flowfiles for * @return a List of FlowFiles in the order in which they were transferred @@ -1013,6 +1038,7 @@ public class MockProcessSession implements ProcessSession { final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile); currentVersions.put(newFlowFile.getId(), newFlowFile); newFlowFile.setPenalized(); + penalized.add(newFlowFile); return newFlowFile; } http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 7358b42..c7cded1 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -336,6 +336,11 @@ public class StandardProcessorTestRunner implements TestRunner { } @Override + public void assertPenalizeCount(final int count) { + Assert.assertEquals(count, getPenalizedFlowFiles().size()); + } + + @Override public void assertValid() { context.assertValid(); } @@ -453,6 +458,23 @@ public class StandardProcessorTestRunner implements TestRunner { return flowFiles; } + @Override + public List<MockFlowFile> getPenalizedFlowFiles() { + final List<MockFlowFile> flowFiles = new ArrayList<>(); + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + flowFiles.addAll(session.getPenalizedFlowFiles()); + } + + Collections.sort(flowFiles, new Comparator<MockFlowFile>() { + @Override + public int compare(final MockFlowFile o1, final MockFlowFile o2) { + return Long.compare(o1.getCreationTime(), o2.getCreationTime()); + } + }); + + return flowFiles; + } + /** * @deprecated The ProvenanceReporter should not be accessed through the test runner, as it does not expose the events that were emitted. */ http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index 5e45299..d1211ef 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -291,6 +291,14 @@ public interface TestRunner { void assertTransferCount(String relationship, int count); /** + * Assert that the number of FlowFiles that were penalized is equal to the given count + * + * @param count + * number of expected penalized + */ + void assertPenalizeCount(int count); + + /** * Assert that there are no FlowFiles left on the input queue. */ void assertQueueEmpty(); @@ -438,6 +446,13 @@ public interface TestRunner { List<MockFlowFile> getFlowFilesForRelationship(Relationship relationship); /** + * Returns a List of FlowFiles in the order in which they were transferred that were penalized + * + * @return flowfiles that were penalized + */ + List<MockFlowFile> getPenalizedFlowFiles(); + + /** * @return the {@link ProvenanceReporter} that will be used by the * configured {@link Processor} for reporting Provenance Events */ http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index 7576be3..4470bf6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -319,6 +319,14 @@ public final class InvokeHTTP extends AbstractProcessor { .allowableValues("true", "false") .build(); + public static final PropertyDescriptor PROP_PENALIZE_NO_RETRY = new PropertyDescriptor.Builder() + .name("Penalize on \"No Retry\"") + .description("Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( PROP_METHOD, PROP_URL, @@ -339,7 +347,8 @@ public final class InvokeHTTP extends AbstractProcessor { PROP_TRUSTED_HOSTNAME, PROP_ADD_HEADERS_TO_REQUEST, PROP_CONTENT_TYPE, - PROP_USE_CHUNKED_ENCODING)); + PROP_USE_CHUNKED_ENCODING, + PROP_PENALIZE_NO_RETRY)); // relationships public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() @@ -844,6 +853,9 @@ public final class InvokeHTTP extends AbstractProcessor { // 1xx, 3xx, 4xx -> NO RETRY } else { if (request != null) { + if (context.getProperty(PROP_PENALIZE_NO_RETRY).asBoolean()) { + request = session.penalize(request); + } session.transfer(request, REL_NO_RETRY); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java index d889dee..4497a8b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java @@ -107,6 +107,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request status.code and status.message // original flow file (+attributes) @@ -153,6 +154,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); //expected in request status.code and status.message //original flow file (+attributes) http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java index a78fb97..d0f29b9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java @@ -114,6 +114,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request status.code and status.message // original flow file (+attributes) @@ -166,6 +167,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request status.code and status.message // original flow file (+attributes) @@ -204,6 +206,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request status.code and status.message // original flow file (+attributes) @@ -244,6 +247,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY,0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request status.code and status.message // original flow file (+attributes) @@ -285,6 +289,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request status.code and status.message // original flow file (+attributes) @@ -325,6 +330,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request status.code and status.message // original flow file (+attributes) @@ -362,6 +368,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request status.code and status.message // original flow file (+all attributes from response) @@ -401,6 +408,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request status.code, status.message and body of response in attribute // original flow file (+attributes) @@ -428,6 +436,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in response // status code, status message, all headers from server response --> ff attributes @@ -455,6 +464,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); runner.setProperty(InvokeHTTP.PROP_METHOD,"OPTION"); @@ -465,6 +475,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); } @Test @@ -483,6 +494,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request // status code, status message, no ff content @@ -516,6 +528,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request status.code and status.message // original flow file (+attributes) @@ -572,6 +585,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in request status.code and status.message // original flow file (+attributes) @@ -607,6 +621,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); //expected in request status.code and status.message //original flow file (+attributes) @@ -646,6 +661,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); //expected in request status.code and status.message //original flow file (+attributes) @@ -686,6 +702,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 1); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(1); // expected in response final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RETRY).get(0); @@ -714,7 +731,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); - // getMyFlowFiles(); + runner.assertPenalizeCount(0); // expected in response final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); @@ -741,7 +758,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); - // getMyFlowFiles(); + runner.assertPenalizeCount(0); // expected in response final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); @@ -768,7 +785,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); - // getMyFlowFiles(); + runner.assertPenalizeCount(0); // expected in response final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); @@ -783,6 +800,34 @@ public abstract class TestInvokeHttpCommon { } @Test + public void test400WithPenalizeNoRetry() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/400"); + runner.setProperty(InvokeHTTP.PROP_PENALIZE_NO_RETRY, "true"); + + createFlowFiles(runner); + + runner.run(); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(1); + // expected in response + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "400"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Bad Request"); + bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/400"); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + } + + @Test public void test412() throws Exception { addHandler(new GetOrHeadHandler()); @@ -797,6 +842,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); // expected in response final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); @@ -826,6 +872,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); @@ -859,6 +906,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); @@ -997,6 +1045,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); @@ -1031,6 +1080,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); @@ -1063,6 +1113,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); @@ -1094,6 +1145,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); //expected in request status.code and status.message //original flow file (+attributes) @@ -1130,6 +1182,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1); + runner.assertPenalizeCount(1); final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0); @@ -1155,6 +1208,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1); + runner.assertPenalizeCount(1); final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0); @@ -1179,6 +1233,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1); + runner.assertPenalizeCount(1); final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0); @@ -1204,6 +1259,7 @@ public abstract class TestInvokeHttpCommon { runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + runner.assertPenalizeCount(0); //expected in request status.code and status.message //original flow file (+attributes)
