Repository: nifi
Updated Branches:
  refs/heads/master 0609a84fa -> fd35b8ffd


Fixes NIFI-1220.  This closes #133. MockProcessSession returns a new FlowFile 
from its `penalty` method instead of mutating then returning the given FlowFile

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fd35b8ff
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fd35b8ff
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fd35b8ff

Branch: refs/heads/master
Commit: fd35b8ffd7d4a66ef149c1bd1341dfee3ede80c7
Parents: 0609a84
Author: Johnathan Gilday <[email protected]>
Authored: Wed Nov 25 13:34:06 2015 -0500
Committer: joewitt <[email protected]>
Committed: Sat Nov 28 17:44:31 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/util/MockProcessSession.java    |  6 +-
 .../nifi/util/TestMockProcessSession.java       | 58 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fd35b8ff/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 97ec9ca..67cf16e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -1010,8 +1010,10 @@ public class MockProcessSession implements 
ProcessSession {
     public MockFlowFile penalize(final FlowFile flowFile) {
         validateState(flowFile);
         final MockFlowFile mockFlowFile = (MockFlowFile) flowFile;
-        mockFlowFile.setPenalized();
-        return mockFlowFile;
+        final MockFlowFile newFlowFile = new 
MockFlowFile(mockFlowFile.getId(), flowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+        newFlowFile.setPenalized();
+        return newFlowFile;
     }
 
     public byte[] getContentAsByteArray(final MockFlowFile flowFile) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd35b8ff/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
new file mode 100644
index 0000000..e728072
--- /dev/null
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class TestMockProcessSession {
+
+    @Test(expected = AssertionError.class)
+    public void testPenalizeFlowFileFromProcessor() {
+        TestRunners.newTestRunner(PoorlyBehavedProcessor.class).run();
+    }
+
+    protected static class PoorlyBehavedProcessor extends AbstractProcessor {
+
+        private static final Relationship REL_FAILURE = new 
Relationship.Builder()
+                .name("failure")
+                .build();
+
+        private final Set<Relationship> relationships = 
Collections.singleton(REL_FAILURE);
+
+        @Override
+        public Set<Relationship> getRelationships() {
+            return relationships;
+        }
+
+        @Override
+        public void onTrigger(final ProcessContext ctx, final ProcessSession 
session) throws ProcessException {
+            final FlowFile file = session.create();
+            session.penalize(file);
+            session.transfer(file, REL_FAILURE);
+        }
+
+    }
+}

Reply via email to