Repository: nifi Updated Branches: refs/heads/master 0609a84fa -> fd35b8ffd
Fixes NIFI-1220. This closes #133. MockProcessSession returns a new FlowFile from its `penalty` method instead of mutating then returning the given FlowFile Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fd35b8ff Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fd35b8ff Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fd35b8ff Branch: refs/heads/master Commit: fd35b8ffd7d4a66ef149c1bd1341dfee3ede80c7 Parents: 0609a84 Author: Johnathan Gilday <[email protected]> Authored: Wed Nov 25 13:34:06 2015 -0500 Committer: joewitt <[email protected]> Committed: Sat Nov 28 17:44:31 2015 -0500 ---------------------------------------------------------------------- .../apache/nifi/util/MockProcessSession.java | 6 +- .../nifi/util/TestMockProcessSession.java | 58 ++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fd35b8ff/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 97ec9ca..67cf16e 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -1010,8 +1010,10 @@ public class MockProcessSession implements ProcessSession { public MockFlowFile penalize(final FlowFile flowFile) { validateState(flowFile); final MockFlowFile mockFlowFile = (MockFlowFile) flowFile; - mockFlowFile.setPenalized(); - return mockFlowFile; + final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + newFlowFile.setPenalized(); + return newFlowFile; } public byte[] getContentAsByteArray(final MockFlowFile flowFile) { http://git-wip-us.apache.org/repos/asf/nifi/blob/fd35b8ff/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java new file mode 100644 index 0000000..e728072 --- /dev/null +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.junit.Test; + +import java.util.Collections; +import java.util.Set; + +public class TestMockProcessSession { + + @Test(expected = AssertionError.class) + public void testPenalizeFlowFileFromProcessor() { + TestRunners.newTestRunner(PoorlyBehavedProcessor.class).run(); + } + + protected static class PoorlyBehavedProcessor extends AbstractProcessor { + + private static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .build(); + + private final Set<Relationship> relationships = Collections.singleton(REL_FAILURE); + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public void onTrigger(final ProcessContext ctx, final ProcessSession session) throws ProcessException { + final FlowFile file = session.create(); + session.penalize(file); + session.transfer(file, REL_FAILURE); + } + + } +}
