http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
new file mode 100644
index 0000000..4ad3138
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -0,0 +1,1037 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.FlowFileHandlingException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.junit.Assert;
+
+public class MockProcessSession implements ProcessSession {
+
+    private final Map<Relationship, List<MockFlowFile>> transferMap = new 
ConcurrentHashMap<>();
+    private final MockFlowFileQueue processorQueue;
+    private final Set<Long> beingProcessed = new HashSet<>();
+
+    private final Map<Long, MockFlowFile> currentVersions = new HashMap<>();
+    private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
+    private final SharedSessionState sharedState;
+    private final Map<String, Long> counterMap = new HashMap<>();
+    private final MockProvenanceReporter provenanceReporter;
+
+    private boolean committed = false;
+    private boolean rolledback = false;
+    private int removedCount = 0;
+
+    public MockProcessSession(final SharedSessionState sharedState, final 
Processor processor) {
+        this.sharedState = sharedState;
+        this.processorQueue = sharedState.getFlowFileQueue();
+        provenanceReporter = new MockProvenanceReporter(this, sharedState, 
processor.getIdentifier(), processor.getClass().getSimpleName());
+    }
+
+    @Override
+    public void adjustCounter(final String name, final long delta, final 
boolean immediate) {
+        if (immediate) {
+            sharedState.adjustCounter(name, delta);
+            return;
+        }
+
+        Long counter = counterMap.get(name);
+        if (counter == null) {
+            counter = delta;
+            counterMap.put(name, counter);
+            return;
+        }
+
+        counter = counter + delta;
+        counterMap.put(name, counter);
+    }
+
+    @Override
+    public MockFlowFile clone(final FlowFile flowFile) {
+        validateState(flowFile);
+        final MockFlowFile newFlowFile = new 
MockFlowFile(sharedState.nextFlowFileId(), flowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+        beingProcessed.add(newFlowFile.getId());
+        return newFlowFile;
+    }
+
+    @Override
+    public MockFlowFile clone(final FlowFile flowFile, final long offset, 
final long size) {
+        validateState(flowFile);
+        if (offset + size > flowFile.getSize()) {
+            throw new FlowFileHandlingException("Specified offset of " + 
offset + " and size " + size + " exceeds size of " + flowFile.toString());
+        }
+
+        final MockFlowFile newFlowFile = new 
MockFlowFile(sharedState.nextFlowFileId(), flowFile);
+        final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) 
flowFile).getData(), (int) offset, (int) (offset + size));
+        newFlowFile.setData(newContent);
+
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+        beingProcessed.add(newFlowFile.getId());
+        return newFlowFile;
+    }
+
+    @Override
+    public void commit() {
+        if (!beingProcessed.isEmpty()) {
+            throw new FlowFileHandlingException("Cannot commit session because 
the following FlowFiles have not been removed or transferred: " + 
beingProcessed);
+        }
+        committed = true;
+        beingProcessed.clear();
+        currentVersions.clear();
+        originalVersions.clear();
+
+        for (final Map.Entry<String, Long> entry : counterMap.entrySet()) {
+            sharedState.adjustCounter(entry.getKey(), entry.getValue());
+        }
+
+        sharedState.addProvenanceEvents(provenanceReporter.getEvents());
+        counterMap.clear();
+    }
+
+    /**
+     * Clear the 'committed' flag so that we can test that the next iteration 
of
+     * {@link nifi.processor.Processor#onTrigger} commits or rolls back the
+     * session
+     */
+    public void clearCommited() {
+        committed = false;
+    }
+
+    /**
+     * Clear the 'rolledBack' flag so that we can test that the next iteration
+     * of {@link nifi.processor.Processor#onTrigger} commits or rolls back the
+     * session
+     */
+    public void clearRollback() {
+        rolledback = false;
+    }
+
+    @Override
+    public MockFlowFile create() {
+        final MockFlowFile flowFile = new 
MockFlowFile(sharedState.nextFlowFileId());
+        currentVersions.put(flowFile.getId(), flowFile);
+        beingProcessed.add(flowFile.getId());
+        return flowFile;
+    }
+
+    @Override
+    public MockFlowFile create(final FlowFile flowFile) {
+        MockFlowFile newFlowFile = create();
+        newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+        beingProcessed.add(newFlowFile.getId());
+        return newFlowFile;
+    }
+
+    @Override
+    public MockFlowFile create(final Collection<FlowFile> flowFiles) {
+        MockFlowFile newFlowFile = create();
+        newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+        beingProcessed.add(newFlowFile.getId());
+        return newFlowFile;
+    }
+
+    @Override
+    public void exportTo(final FlowFile flowFile, final OutputStream out) {
+        validateState(flowFile);
+        if (flowFile == null || out == null) {
+            throw new IllegalArgumentException("arguments cannot be null");
+        }
+
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
+        }
+
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+
+        try {
+            out.write(mock.getData());
+        } catch (final IOException e) {
+            throw new FlowFileAccessException(e.toString(), e);
+        }
+    }
+
+    @Override
+    public void exportTo(final FlowFile flowFile, final Path path, final 
boolean append) {
+        validateState(flowFile);
+        if (flowFile == null || path == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
+        }
+
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+
+        final OpenOption mode = append ? StandardOpenOption.APPEND : 
StandardOpenOption.CREATE;
+
+        try (final OutputStream out = Files.newOutputStream(path, mode)) {
+            out.write(mock.getData());
+        } catch (final IOException e) {
+            throw new FlowFileAccessException(e.toString(), e);
+        }
+    }
+
+    @Override
+    public MockFlowFile get() {
+        final MockFlowFile flowFile = processorQueue.poll();
+        if (flowFile != null) {
+            beingProcessed.add(flowFile.getId());
+            currentVersions.put(flowFile.getId(), flowFile);
+            originalVersions.put(flowFile.getId(), flowFile);
+        }
+        return flowFile;
+    }
+
+    @Override
+    public List<FlowFile> get(final int maxResults) {
+        final List<FlowFile> flowFiles = new ArrayList<>(Math.min(500, 
maxResults));
+        for (int i = 0; i < maxResults; i++) {
+            final MockFlowFile nextFlowFile = get();
+            if (nextFlowFile == null) {
+                return flowFiles;
+            }
+
+            flowFiles.add(nextFlowFile);
+        }
+
+        return flowFiles;
+    }
+
+    @Override
+    public List<FlowFile> get(final FlowFileFilter filter) {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        final List<MockFlowFile> unselected = new ArrayList<>();
+
+        while (true) {
+            final MockFlowFile flowFile = processorQueue.poll();
+            if (flowFile == null) {
+                break;
+            }
+
+            final FlowFileFilter.FlowFileFilterResult filterResult = 
filter.filter(flowFile);
+            if (filterResult.isAccept()) {
+                flowFiles.add(flowFile);
+
+                beingProcessed.add(flowFile.getId());
+                currentVersions.put(flowFile.getId(), flowFile);
+                originalVersions.put(flowFile.getId(), flowFile);
+            } else {
+                unselected.add(flowFile);
+            }
+
+            if (!filterResult.isContinue()) {
+                break;
+            }
+        }
+
+        processorQueue.addAll(unselected);
+        return flowFiles;
+    }
+
+    @Override
+    public QueueSize getQueueSize() {
+        return processorQueue.size();
+    }
+
+    @Override
+    public MockFlowFile importFrom(final InputStream in, final FlowFile 
flowFile) {
+        validateState(flowFile);
+        if (in == null || flowFile == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
+        }
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+
+        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), 
flowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+        try {
+            final byte[] data = readFully(in);
+            newFlowFile.setData(data);
+            return newFlowFile;
+        } catch (final IOException e) {
+            throw new FlowFileAccessException(e.toString(), e);
+        }
+    }
+
+    @Override
+    public MockFlowFile importFrom(final Path path, final boolean 
keepSourceFile, final FlowFile flowFile) {
+        validateState(flowFile);
+        if (path == null || flowFile == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
+        }
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+        MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+            Files.copy(path, baos);
+        } catch (final IOException e) {
+            throw new FlowFileAccessException(e.toString(), e);
+        }
+
+        newFlowFile.setData(baos.toByteArray());
+        newFlowFile = putAttribute(newFlowFile, CoreAttributes.FILENAME.key(), 
path.getFileName().toString());
+        return newFlowFile;
+    }
+
+    @Override
+    public MockFlowFile merge(final Collection<FlowFile> sources, final 
FlowFile destination) {
+        for (final FlowFile source : sources) {
+            validateState(source);
+        }
+        validateState(destination);
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : sources) {
+            final MockFlowFile mock = (MockFlowFile) flowFile;
+            final byte[] data = mock.getData();
+            try {
+                baos.write(data);
+            } catch (final IOException e) {
+                throw new AssertionError("Failed to write to BAOS");
+            }
+        }
+
+        final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), 
destination);
+        newFlowFile.setData(baos.toByteArray());
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+        return newFlowFile;
+    }
+
+    @Override
+    public MockFlowFile putAllAttributes(final FlowFile flowFile, final 
Map<String, String> attrs) {
+        validateState(flowFile);
+        if (attrs == null || flowFile == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot update attributes of a 
flow file that I did not create");
+        }
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), 
flowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+        newFlowFile.putAttributes(attrs);
+        return newFlowFile;
+    }
+
+    @Override
+    public MockFlowFile putAttribute(final FlowFile flowFile, final String 
attrName, final String attrValue) {
+        validateState(flowFile);
+        if (attrName == null || attrValue == null || flowFile == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot update attributes of a 
flow file that I did not create");
+        }
+
+        if ("uuid".equals(attrName)) {
+            Assert.fail("Should not be attempting to set FlowFile UUID via 
putAttribute. This will be ignored in production");
+        }
+
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), 
flowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(attrName, attrValue);
+        newFlowFile.putAttributes(attrs);
+        return newFlowFile;
+    }
+
+    @Override
+    public void read(final FlowFile flowFile, final InputStreamCallback 
callback) {
+        if (callback == null || flowFile == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+
+        validateState(flowFile);
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
+        }
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(mock.getData());
+        try {
+            callback.process(bais);
+        } catch (final IOException e) {
+            throw new ProcessException(e.toString(), e);
+        }
+    }
+
+    @Override
+    public void remove(final FlowFile flowFile) {
+        validateState(flowFile);
+        final Iterator<Long> itr = beingProcessed.iterator();
+        while (itr.hasNext()) {
+            final Long ffId = itr.next();
+            if (ffId != null && ffId.equals(flowFile.getId())) {
+                itr.remove();
+                beingProcessed.remove(ffId);
+                removedCount++;
+                currentVersions.remove(ffId);
+                return;
+            }
+        }
+
+        throw new ProcessException(flowFile + " not found in queue");
+    }
+
+    @Override
+    public void remove(final Collection<FlowFile> flowFiles) {
+        for (final FlowFile flowFile : flowFiles) {
+            validateState(flowFile);
+        }
+
+        for (final FlowFile flowFile : flowFiles) {
+            remove(flowFile);
+        }
+    }
+
+    @Override
+    public MockFlowFile removeAllAttributes(final FlowFile flowFile, final 
Set<String> attrNames) {
+        validateState(flowFile);
+        if (attrNames == null || flowFile == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
+        }
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+
+        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), 
flowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+        newFlowFile.removeAttributes(attrNames);
+        return newFlowFile;
+    }
+
+    @Override
+    public MockFlowFile removeAllAttributes(final FlowFile flowFile, final 
Pattern keyPattern) {
+        validateState(flowFile);
+        if (flowFile == null) {
+            throw new IllegalArgumentException("flowFile cannot be null");
+        }
+        if (keyPattern == null) {
+            return (MockFlowFile) flowFile;
+        }
+
+        final Set<String> attrsToRemove = new HashSet<>();
+        for (final String key : flowFile.getAttributes().keySet()) {
+            if (keyPattern.matcher(key).matches()) {
+                attrsToRemove.add(key);
+            }
+        }
+
+        return removeAllAttributes(flowFile, attrsToRemove);
+    }
+
+    @Override
+    public MockFlowFile removeAttribute(final FlowFile flowFile, final String 
attrName) {
+        validateState(flowFile);
+        if (attrName == null || flowFile == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
+        }
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), 
flowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+        final Set<String> attrNames = new HashSet<>();
+        attrNames.add(attrName);
+        newFlowFile.removeAttributes(attrNames);
+        return newFlowFile;
+    }
+
+    @Override
+    public void rollback() {
+        rollback(false);
+    }
+
+    @Override
+    public void rollback(final boolean penalize) {
+        for (final List<MockFlowFile> list : transferMap.values()) {
+            for (final MockFlowFile flowFile : list) {
+                processorQueue.offer(flowFile);
+            }
+        }
+
+        for (final Long flowFileId : beingProcessed) {
+            final MockFlowFile flowFile = originalVersions.get(flowFileId);
+            if (flowFile != null) {
+                processorQueue.offer(flowFile);
+            }
+        }
+
+        rolledback = true;
+        beingProcessed.clear();
+        currentVersions.clear();
+        originalVersions.clear();
+        transferMap.clear();
+        clearTransferState();
+    }
+
+    @Override
+    public void transfer(final FlowFile flowFile) {
+        validateState(flowFile);
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("I only accept MockFlowFile");
+        }
+
+        beingProcessed.remove(flowFile.getId());
+        processorQueue.offer((MockFlowFile) flowFile);
+    }
+
+    @Override
+    public void transfer(final Collection<FlowFile> flowFiles) {
+        for (final FlowFile flowFile : flowFiles) {
+            transfer(flowFile);
+        }
+    }
+
+    @Override
+    public void transfer(final FlowFile flowFile, final Relationship 
relationship) {
+        if (relationship == Relationship.SELF) {
+            transfer(flowFile);
+            return;
+        }
+
+        validateState(flowFile);
+        List<MockFlowFile> list = transferMap.get(relationship);
+        if (list == null) {
+            list = new ArrayList<>();
+            transferMap.put(relationship, list);
+        }
+
+        beingProcessed.remove(flowFile.getId());
+        list.add((MockFlowFile) flowFile);
+    }
+
+    @Override
+    public void transfer(final Collection<FlowFile> flowFiles, final 
Relationship relationship) {
+        if (relationship == Relationship.SELF) {
+            transfer(flowFiles);
+            return;
+        }
+
+        for (final FlowFile flowFile : flowFiles) {
+            validateState(flowFile);
+        }
+
+        List<MockFlowFile> list = transferMap.get(relationship);
+        if (list == null) {
+            list = new ArrayList<>();
+            transferMap.put(relationship, list);
+        }
+
+        for (final FlowFile flowFile : flowFiles) {
+            beingProcessed.remove(flowFile.getId());
+            list.add((MockFlowFile) flowFile);
+        }
+    }
+
+    @Override
+    public MockFlowFile write(final FlowFile flowFile, final 
OutputStreamCallback callback) {
+        validateState(flowFile);
+        if (callback == null || flowFile == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
+        }
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+            callback.process(baos);
+        } catch (final IOException e) {
+            throw new ProcessException(e.toString(), e);
+        }
+
+        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), 
flowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+        newFlowFile.setData(baos.toByteArray());
+        return newFlowFile;
+    }
+
+    @Override
+    public FlowFile append(final FlowFile flowFile, final OutputStreamCallback 
callback) {
+        validateState(flowFile);
+        if (callback == null || flowFile == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
+        }
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+            baos.write(mock.getData());
+            callback.process(baos);
+        } catch (final IOException e) {
+            throw new ProcessException(e.toString(), e);
+        }
+
+        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), 
flowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+        newFlowFile.setData(baos.toByteArray());
+        return newFlowFile;
+    }
+
+    @Override
+    public MockFlowFile write(final FlowFile flowFile, final StreamCallback 
callback) {
+        validateState(flowFile);
+        if (callback == null || flowFile == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
+        }
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+
+        final ByteArrayInputStream in = new 
ByteArrayInputStream(mock.getData());
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try {
+            callback.process(in, out);
+        } catch (final IOException e) {
+            throw new ProcessException(e.toString(), e);
+        }
+
+        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), 
flowFile);
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+        newFlowFile.setData(out.toByteArray());
+
+        return newFlowFile;
+    }
+
+    private byte[] readFully(final InputStream in) throws IOException {
+        final byte[] buffer = new byte[4096];
+        int len;
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        while ((len = in.read(buffer)) >= 0) {
+            baos.write(buffer, 0, len);
+        }
+
+        return baos.toByteArray();
+    }
+
+    public List<MockFlowFile> getFlowFilesForRelationship(final Relationship 
relationship) {
+        List<MockFlowFile> list = this.transferMap.get(relationship);
+        if (list == null) {
+            list = new ArrayList<>();
+        }
+
+        return list;
+    }
+
+    /**
+     * @param relationship to get flowfiles for
+     * @return a List of FlowFiles in the order in which they were transferred
+     * to the given relationship
+     */
+    public List<MockFlowFile> getFlowFilesForRelationship(final String 
relationship) {
+        final Relationship procRel = new 
Relationship.Builder().name(relationship).build();
+        return getFlowFilesForRelationship(procRel);
+    }
+
+    public MockFlowFile createFlowFile(final File file) throws IOException {
+        return createFlowFile(Files.readAllBytes(file.toPath()));
+    }
+
+    public MockFlowFile createFlowFile(final byte[] data) {
+        final MockFlowFile flowFile = create();
+        flowFile.setData(data);
+        return flowFile;
+    }
+
+    public MockFlowFile createFlowFile(final byte[] data, final Map<String, 
String> attrs) {
+        final MockFlowFile ff = createFlowFile(data);
+        ff.putAttributes(attrs);
+        return ff;
+    }
+
+    @Override
+    public MockFlowFile merge(Collection<FlowFile> sources, FlowFile 
destination, byte[] header, byte[] footer, byte[] demarcator) {
+        for (final FlowFile flowFile : sources) {
+            validateState(flowFile);
+        }
+        validateState(destination);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+            if (header != null) {
+                baos.write(header);
+            }
+
+            int count = 0;
+            for (final FlowFile flowFile : sources) {
+                baos.write(((MockFlowFile) flowFile).getData());
+                if (demarcator != null && ++count != sources.size()) {
+                    baos.write(demarcator);
+                }
+            }
+
+            if (footer != null) {
+                baos.write(footer);
+            }
+        } catch (final IOException e) {
+            throw new AssertionError("failed to write data to BAOS");
+        }
+
+        final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), 
destination);
+        newFlowFile.setData(baos.toByteArray());
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+        return newFlowFile;
+    }
+
+    private void validateState(final FlowFile flowFile) {
+        Objects.requireNonNull(flowFile);
+        final FlowFile currentVersion = currentVersions.get(flowFile.getId());
+        if (currentVersion == null) {
+            throw new FlowFileHandlingException(flowFile + " is not known in 
this session");
+        }
+
+        if (currentVersion != flowFile) {
+            throw new FlowFileHandlingException(flowFile + " is not the most 
recent version of this flow file within this session");
+        }
+
+        for (final List<MockFlowFile> flowFiles : transferMap.values()) {
+            if (flowFiles.contains(flowFile)) {
+                throw new IllegalStateException(flowFile + " has already been 
transferred");
+            }
+        }
+    }
+
+    /**
+     * Inherits the attributes from the given source flow file into another 
flow
+     * file. The UUID of the source becomes the parent UUID of this flow file.
+     * If a parent uuid had previously been established it will be replaced by
+     * the uuid of the given source
+     *
+     * @param source the FlowFile from which to copy attributes
+     * @param destination the FlowFile to which to copy attributes
+     */
+    private FlowFile inheritAttributes(final FlowFile source, final FlowFile 
destination) {
+        if (source == null || destination == null || source == destination) {
+            return destination; //don't need to inherit from ourselves
+        }
+        final FlowFile updated = putAllAttributes(destination, 
source.getAttributes());
+        getProvenanceReporter().fork(source, 
Collections.singletonList(updated));
+        return updated;
+    }
+
+    /**
+     * Inherits the attributes from the given source flow files into the
+     * destination flow file. The UUIDs of the sources becomes the parent UUIDs
+     * of the destination flow file. Only attributes which is common to all
+     * source items is copied into this flow files attributes. Any previously
+     * established parent UUIDs will be replaced by the UUIDs of the sources. 
It
+     * will capture the uuid of a certain number of source objects and may not
+     * capture all of them. How many it will capture is unspecified.
+     *
+     * @param sources to inherit common attributes from
+     */
+    private FlowFile inheritAttributes(final Collection<FlowFile> sources, 
final FlowFile destination) {
+        final StringBuilder parentUuidBuilder = new StringBuilder();
+        int uuidsCaptured = 0;
+        for (final FlowFile source : sources) {
+            if (source == destination) {
+                continue; //don't want to capture parent uuid of this.  
Something can't be a child of itself
+            }
+            final String sourceUuid = 
source.getAttribute(CoreAttributes.UUID.key());
+            if (sourceUuid != null && !sourceUuid.trim().isEmpty()) {
+                uuidsCaptured++;
+                if (parentUuidBuilder.length() > 0) {
+                    parentUuidBuilder.append(",");
+                }
+                parentUuidBuilder.append(sourceUuid);
+            }
+
+            if (uuidsCaptured > 100) {
+                break;
+            }
+        }
+
+        final FlowFile updated = putAllAttributes(destination, 
intersectAttributes(sources));
+        getProvenanceReporter().join(sources, updated);
+        return updated;
+    }
+
+    /**
+     * Returns the attributes that are common to every flow file given. The key
+     * and value must match exactly.
+     *
+     * @param flowFileList a list of flow files
+     *
+     * @return the common attributes
+     */
+    private static Map<String, String> intersectAttributes(final 
Collection<FlowFile> flowFileList) {
+        final Map<String, String> result = new HashMap<>();
+        //trivial cases
+        if (flowFileList == null || flowFileList.isEmpty()) {
+            return result;
+        } else if (flowFileList.size() == 1) {
+            result.putAll(flowFileList.iterator().next().getAttributes());
+        }
+
+        /*
+         * Start with the first attribute map and only put an entry to the
+         * resultant map if it is common to every map.
+         */
+        final Map<String, String> firstMap = 
flowFileList.iterator().next().getAttributes();
+
+        outer:
+        for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+            final String key = mapEntry.getKey();
+            final String value = mapEntry.getValue();
+            for (final FlowFile flowFile : flowFileList) {
+                final Map<String, String> currMap = flowFile.getAttributes();
+                final String curVal = currMap.get(key);
+                if (curVal == null || !curVal.equals(value)) {
+                    continue outer;
+                }
+            }
+            result.put(key, value);
+        }
+
+        return result;
+    }
+
+    /**
+     * Assert that {@link #commit()} has been called
+     */
+    public void assertCommitted() {
+        Assert.assertTrue("Session was not committed", committed);
+    }
+
+    /**
+     * Assert that {@link #commit()} has not been called
+     */
+    public void assertNotCommitted() {
+        Assert.assertFalse("Session was committed", committed);
+    }
+
+    /**
+     * Assert that {@link #rollback()} has been called
+     */
+    public void assertRolledBack() {
+        Assert.assertTrue("Session was not rolled back", rolledback);
+    }
+
+    /**
+     * Assert that {@link #rollback()} has not been called
+     */
+    public void assertNotRolledBack() {
+        Assert.assertFalse("Session was rolled back", rolledback);
+    }
+
+    /**
+     * Assert that the number of FlowFiles transferred to the given 
relationship
+     * is equal to the given count
+     *
+     * @param relationship to validate transfer count of
+     * @param count items transfer to given relationship
+     */
+    public void assertTransferCount(final Relationship relationship, final int 
count) {
+        final int transferCount = 
getFlowFilesForRelationship(relationship).size();
+        Assert.assertEquals("Expected " + count + " FlowFiles to be 
transferred to "
+                + relationship + " but actual transfer count was " + 
transferCount, count, transferCount);
+    }
+
+    /**
+     * Assert that the number of FlowFiles transferred to the given 
relationship
+     * is equal to the given count
+     *
+     * @param relationship to validate transfer count of
+     * @param count items transfer to given relationship
+     */
+    public void assertTransferCount(final String relationship, final int 
count) {
+        assertTransferCount(new 
Relationship.Builder().name(relationship).build(), count);
+    }
+
+    /**
+     * Assert that there are no FlowFiles left on the input queue.
+     */
+    public void assertQueueEmpty() {
+        Assert.assertTrue("FlowFile Queue has " + this.processorQueue.size() + 
" items", this.processorQueue.isEmpty());
+    }
+
+    /**
+     * Assert that at least one FlowFile is on the input queue
+     */
+    public void assertQueueNotEmpty() {
+        Assert.assertFalse("FlowFile Queue is empty", 
this.processorQueue.isEmpty());
+    }
+
+    /**
+     * Asserts that all FlowFiles that were transferred were transferred to the
+     * given relationship
+     *
+     * @param relationship to check for transferred flow files
+     */
+    public void assertAllFlowFilesTransferred(final String relationship) {
+        assertAllFlowFilesTransferred(new 
Relationship.Builder().name(relationship).build());
+    }
+
+    /**
+     * Asserts that all FlowFiles that were transferred were transferred to the
+     * given relationship
+     *
+     * @param relationship to validate
+     */
+    public void assertAllFlowFilesTransferred(final Relationship relationship) 
{
+        for (final Map.Entry<Relationship, List<MockFlowFile>> entry : 
transferMap.entrySet()) {
+            final Relationship rel = entry.getKey();
+            final List<MockFlowFile> flowFiles = entry.getValue();
+
+            if (!rel.equals(relationship) && flowFiles != null && 
!flowFiles.isEmpty()) {
+                Assert.fail("Expected all Transferred FlowFiles to go to " + 
relationship + " but " + flowFiles.size() + " were routed to " + rel);
+            }
+        }
+    }
+
+    /**
+     * Removes all state information about FlowFiles that have been transferred
+     */
+    public void clearTransferState() {
+        this.transferMap.clear();
+    }
+
+    /**
+     * Asserts that all FlowFiles that were transferred were transferred to the
+     * given relationship and that the number of FlowFiles transferred is equal
+     * to <code>count</code>
+     *
+     * @param relationship to validate
+     * @param count number of items sent to that relationship (expected)
+     */
+    public void assertAllFlowFilesTransferred(final Relationship relationship, 
final int count) {
+        assertAllFlowFilesTransferred(relationship);
+        assertTransferCount(relationship, count);
+    }
+
+    /**
+     * Asserts that all FlowFiles that were transferred were transferred to the
+     * given relationship and that the number of FlowFiles transferred is equal
+     * to <code>count</code>
+     *
+     * @param relationship to validate
+     * @param count number of items sent to that relationship (expected)
+     */
+    public void assertAllFlowFilesTransferred(final String relationship, final 
int count) {
+        assertAllFlowFilesTransferred(new 
Relationship.Builder().name(relationship).build(), count);
+    }
+
+    /**
+     * @return the number of FlowFiles that were removed
+     */
+    public int getRemovedCount() {
+        return removedCount;
+    }
+
+    @Override
+    public ProvenanceReporter getProvenanceReporter() {
+        return provenanceReporter;
+    }
+
+    @Override
+    public MockFlowFile penalize(final FlowFile flowFile) {
+        validateState(flowFile);
+        final MockFlowFile mockFlowFile = (MockFlowFile) flowFile;
+        mockFlowFile.setPenalized();
+        return mockFlowFile;
+    }
+
+    public byte[] getContentAsByteArray(final MockFlowFile flowFile) {
+        validateState(flowFile);
+        return flowFile.getData();
+    }
+
+    /**
+     * Checks if a FlowFile is known in this session.
+     *
+     * @param flowFile
+     *            the FlowFile to check
+     * @return <code>true</code> if the FlowFile is known in this session,
+     *         <code>false</code> otherwise.
+     */
+    boolean isFlowFileKnown(final FlowFile flowFile) {
+        final FlowFile curFlowFile = currentVersions.get(flowFile.getId());
+        if (curFlowFile == null) {
+            return false;
+        }
+
+        final String curUuid = 
curFlowFile.getAttribute(CoreAttributes.UUID.key());
+        final String providedUuid = 
curFlowFile.getAttribute(CoreAttributes.UUID.key());
+        if (!curUuid.equals(providedUuid)) {
+            return false;
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
new file mode 100644
index 0000000..2e5d3eb
--- /dev/null
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+
+public class MockProcessorInitializationContext implements 
ProcessorInitializationContext, ControllerServiceLookup {
+
+    private final ProcessorLog logger;
+    private final String processorId;
+    private final MockProcessContext context;
+
+    public MockProcessorInitializationContext(final Processor processor, final 
MockProcessContext context) {
+        processorId = UUID.randomUUID().toString();
+        logger = new MockProcessorLog(processorId, processor);
+        this.context = context;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return processorId;
+    }
+
+    @Override
+    public ProcessorLog getLogger() {
+        return logger;
+    }
+
+    @Override
+    public Set<String> getControllerServiceIdentifiers(final Class<? extends 
ControllerService> serviceType) {
+        return context.getControllerServiceIdentifiers(serviceType);
+    }
+
+    @Override
+    public ControllerService getControllerService(final String identifier) {
+        return context.getControllerService(identifier);
+    }
+
+    @Override
+    public ControllerServiceLookup getControllerServiceLookup() {
+        return this;
+    }
+
+    @Override
+    public String getControllerServiceName(String serviceIdentifier) {
+        return context.getControllerServiceName(serviceIdentifier);
+    }
+
+    @Override
+    public boolean isControllerServiceEnabled(String serviceIdentifier) {
+        return context.isControllerServiceEnabled(serviceIdentifier);
+    }
+
+    @Override
+    public boolean isControllerServiceEnabled(ControllerService service) {
+        return context.isControllerServiceEnabled(service);
+    }
+
+    @Override
+    public boolean isControllerServiceEnabling(String serviceIdentifier) {
+        return context.isControllerServiceEnabling(serviceIdentifier);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java
new file mode 100644
index 0000000..837784b
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.logging.ProcessorLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockProcessorLog implements ProcessorLog {
+
+    private final Logger logger;
+    private final Object component;
+
+    public MockProcessorLog(final String componentId, final Object component) {
+        this.logger = LoggerFactory.getLogger(component.getClass());
+        this.component = component;
+    }
+
+    private Object[] addProcessor(final Object[] originalArgs) {
+        return prependToArgs(originalArgs, component);
+    }
+
+    private Object[] addProcessorAndThrowable(final Object[] os, final 
Throwable t) {
+        final Object[] modifiedArgs = new Object[os.length + 2];
+        modifiedArgs[0] = component.toString();
+        for (int i = 0; i < os.length; i++) {
+            modifiedArgs[i + 1] = os[i];
+        }
+        modifiedArgs[modifiedArgs.length - 1] = t.toString();
+
+        return modifiedArgs;
+    }
+
+    private Object[] prependToArgs(final Object[] originalArgs, final 
Object... toAdd) {
+        final Object[] newArgs = new Object[originalArgs.length + 
toAdd.length];
+        System.arraycopy(toAdd, 0, newArgs, 0, toAdd.length);
+        System.arraycopy(originalArgs, 0, newArgs, toAdd.length, 
originalArgs.length);
+        return newArgs;
+    }
+
+    private Object[] translateException(final Object[] os) {
+        if (os != null && os.length > 0 && (os[os.length - 1] instanceof 
Throwable)) {
+            final Object[] osCopy = new Object[os.length];
+            osCopy[osCopy.length - 1] = os[os.length - 1].toString();
+            System.arraycopy(os, 0, osCopy, 0, os.length - 1);
+            return osCopy;
+        }
+        return os;
+    }
+
+    private boolean lastArgIsException(final Object[] os) {
+        return (os != null && os.length > 0 && (os[os.length - 1] instanceof 
Throwable));
+    }
+
+    @Override
+    public void warn(final String msg, final Throwable t) {
+        warn("{} " + msg, new Object[]{component}, t);
+    }
+
+    @Override
+    public void warn(String msg, Object[] os) {
+        if (lastArgIsException(os)) {
+            warn(msg, translateException(os), (Throwable) os[os.length - 1]);
+        } else {
+            msg = "{} " + msg;
+            os = addProcessor(os);
+            logger.warn(msg, os);
+        }
+    }
+
+    @Override
+    public void warn(String msg, Object[] os, final Throwable t) {
+        os = addProcessorAndThrowable(os, t);
+        msg = "{} " + msg + ": {}";
+
+        logger.warn(msg, os);
+        if (logger.isDebugEnabled()) {
+            logger.warn("", t);
+        }
+    }
+
+    @Override
+    public void warn(String msg) {
+        msg = "{} " + msg;
+        logger.warn(msg, component);
+    }
+
+    @Override
+    public void trace(String msg, Throwable t) {
+        msg = "{} " + msg;
+        final Object[] os = {component};
+        logger.trace(msg, os, t);
+    }
+
+    @Override
+    public void trace(String msg, Object[] os) {
+        msg = "{} " + msg;
+        os = addProcessor(os);
+        logger.trace(msg, os);
+    }
+
+    @Override
+    public void trace(String msg) {
+        msg = "{} " + msg;
+        final Object[] os = {component};
+        logger.trace(msg, os);
+    }
+
+    @Override
+    public void trace(String msg, Object[] os, Throwable t) {
+        os = addProcessorAndThrowable(os, t);
+        msg = "{} " + msg + ": {}";
+
+        logger.trace(msg, os);
+        logger.trace("", t);
+    }
+
+    @Override
+    public boolean isWarnEnabled() {
+        return logger.isWarnEnabled();
+    }
+
+    @Override
+    public boolean isTraceEnabled() {
+        return logger.isTraceEnabled();
+    }
+
+    @Override
+    public boolean isInfoEnabled() {
+        return logger.isInfoEnabled();
+    }
+
+    @Override
+    public boolean isErrorEnabled() {
+        return logger.isErrorEnabled();
+    }
+
+    @Override
+    public boolean isDebugEnabled() {
+        return logger.isDebugEnabled();
+    }
+
+    @Override
+    public void info(String msg, Throwable t) {
+        msg = "{} " + msg;
+        final Object[] os = {component};
+
+        logger.info(msg, os);
+        if (logger.isDebugEnabled()) {
+            logger.info("", t);
+        }
+    }
+
+    @Override
+    public void info(String msg, Object[] os) {
+        msg = "{} " + msg;
+        os = addProcessor(os);
+
+        logger.info(msg, os);
+    }
+
+    @Override
+    public void info(String msg) {
+        msg = "{} " + msg;
+        final Object[] os = {component};
+
+        logger.info(msg, os);
+    }
+
+    @Override
+    public void info(String msg, Object[] os, Throwable t) {
+        os = addProcessorAndThrowable(os, t);
+        msg = "{} " + msg + ": {}";
+
+        logger.info(msg, os);
+        if (logger.isDebugEnabled()) {
+            logger.info("", t);
+        }
+    }
+
+    @Override
+    public String getName() {
+        return logger.getName();
+    }
+
+    @Override
+    public void error(String msg, Throwable t) {
+        msg = "{} " + msg;
+        final Object[] os = {component};
+
+        logger.error(msg, os, t);
+        if (logger.isDebugEnabled()) {
+            logger.error("", t);
+        }
+    }
+
+    @Override
+    public void error(String msg, Object[] os) {
+        if (lastArgIsException(os)) {
+            error(msg, translateException(os), (Throwable) os[os.length - 1]);
+        } else {
+            os = addProcessor(os);
+            msg = "{} " + msg;
+            logger.error(msg, os);
+        }
+    }
+
+    @Override
+    public void error(String msg) {
+        msg = "{} " + msg;
+        final Object[] os = {component};
+
+        logger.error(msg, os);
+    }
+
+    @Override
+    public void error(String msg, Object[] os, Throwable t) {
+        os = addProcessorAndThrowable(os, t);
+        msg = "{} " + msg + ": {}";
+
+        logger.error(msg, os);
+        if (logger.isDebugEnabled()) {
+            logger.error("", t);
+        }
+    }
+
+    @Override
+    public void debug(String msg, Throwable t) {
+        msg = "{} " + msg;
+        final Object[] os = {component};
+
+        logger.debug(msg, os, t);
+    }
+
+    @Override
+    public void debug(String msg, Object[] os) {
+        os = addProcessor(os);
+        msg = "{} " + msg;
+
+        logger.debug(msg, os);
+    }
+
+    @Override
+    public void debug(String msg, Object[] os, Throwable t) {
+        os = addProcessorAndThrowable(os, t);
+        msg = "{} " + msg + ": {}";
+
+        logger.debug(msg, os);
+        if (logger.isDebugEnabled()) {
+            logger.debug("", t);
+        }
+    }
+
+    @Override
+    public void debug(String msg) {
+        msg = "{} " + msg;
+        final Object[] os = {component};
+
+        logger.debug(msg, os);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
new file mode 100644
index 0000000..12436d4
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.attribute.expression.language.Query;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.expression.AttributeValueDecorator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class MockPropertyValue implements PropertyValue {
+
+    private final String rawValue;
+    private final Boolean expectExpressions;
+    private final ControllerServiceLookup serviceLookup;
+    private final PropertyDescriptor propertyDescriptor;
+    private boolean expressionsEvaluated = false;
+
+    public MockPropertyValue(final String rawValue, final 
ControllerServiceLookup serviceLookup) {
+        this(rawValue, serviceLookup, null);
+    }
+
+    public MockPropertyValue(final String rawValue, final 
ControllerServiceLookup serviceLookup, final PropertyDescriptor 
propertyDescriptor) {
+        this.rawValue = rawValue;
+        this.serviceLookup = serviceLookup;
+        this.expectExpressions = propertyDescriptor == null ? null : 
propertyDescriptor.isExpressionLanguageSupported();
+        this.propertyDescriptor = propertyDescriptor;
+    }
+
+    private void ensureExpressionsEvaluated() {
+        if (Boolean.TRUE.equals(expectExpressions) && !expressionsEvaluated) {
+            throw new IllegalStateException("Attempting to retrieve value of " 
+ propertyDescriptor
+                    + " without first evaluating Expressions, even though the 
PropertyDescriptor indicates "
+                    + "that the Expression Language is Supported. If you 
realize that this is the case and do not want "
+                    + "this error to occur, it can be disabled by calling 
TestRunner.setValidateExpressionUsage(false)");
+        }
+    }
+
+    @Override
+    public String getValue() {
+        ensureExpressionsEvaluated();
+        return rawValue;
+    }
+
+    @Override
+    public Integer asInteger() {
+        ensureExpressionsEvaluated();
+        return (rawValue == null) ? null : Integer.parseInt(rawValue.trim());
+    }
+
+    @Override
+    public Long asLong() {
+        ensureExpressionsEvaluated();
+        return (rawValue == null) ? null : Long.parseLong(rawValue.trim());
+    }
+
+    @Override
+    public Boolean asBoolean() {
+        ensureExpressionsEvaluated();
+        return (rawValue == null) ? null : 
Boolean.parseBoolean(rawValue.trim());
+    }
+
+    @Override
+    public Float asFloat() {
+        ensureExpressionsEvaluated();
+        return (rawValue == null) ? null : Float.parseFloat(rawValue.trim());
+    }
+
+    @Override
+    public Double asDouble() {
+        ensureExpressionsEvaluated();
+        return (rawValue == null) ? null : Double.parseDouble(rawValue.trim());
+    }
+
+    @Override
+    public Long asTimePeriod(final TimeUnit timeUnit) {
+        ensureExpressionsEvaluated();
+        return (rawValue == null) ? null : 
FormatUtils.getTimeDuration(rawValue.trim(), timeUnit);
+    }
+
+    @Override
+    public Double asDataSize(final DataUnit dataUnit) {
+        ensureExpressionsEvaluated();
+        return rawValue == null ? null : 
DataUnit.parseDataSize(rawValue.trim(), dataUnit);
+    }
+
+    private void markEvaluated() {
+        if (Boolean.FALSE.equals(expectExpressions)) {
+            throw new IllegalStateException("Attempting to Evaluate 
Expressions but " + propertyDescriptor
+                    + " indicates that the Expression Language is not 
supported. If you realize that this is the case and do not want "
+                    + "this error to occur, it can be disabled by calling 
TestRunner.setValidateExpressionUsage(false)");
+        }
+        expressionsEvaluated = true;
+    }
+
+    @Override
+    public PropertyValue evaluateAttributeExpressions() throws 
ProcessException {
+        markEvaluated();
+        if (rawValue == null) {
+            return this;
+        }
+        return evaluateAttributeExpressions(null, null);
+    }
+
+    @Override
+    public PropertyValue evaluateAttributeExpressions(final 
AttributeValueDecorator decorator) throws ProcessException {
+        markEvaluated();
+        if (rawValue == null) {
+            return this;
+        }
+        return evaluateAttributeExpressions(null, decorator);
+    }
+
+    @Override
+    public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) 
throws ProcessException {
+        markEvaluated();
+        if (rawValue == null) {
+            return this;
+        }
+        return evaluateAttributeExpressions(flowFile, null);
+    }
+
+    @Override
+    public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, 
final AttributeValueDecorator decorator) throws ProcessException {
+        markEvaluated();
+        if (rawValue == null) {
+            return this;
+        }
+        return new MockPropertyValue(Query.evaluateExpressions(rawValue, 
flowFile, decorator), serviceLookup);
+    }
+
+    @Override
+    public ControllerService asControllerService() {
+        ensureExpressionsEvaluated();
+        if (rawValue == null || rawValue.equals("")) {
+            return null;
+        }
+
+        return serviceLookup.getControllerService(rawValue);
+    }
+
+    @Override
+    public <T extends ControllerService> T asControllerService(final Class<T> 
serviceType) throws IllegalArgumentException {
+        ensureExpressionsEvaluated();
+        if (rawValue == null || rawValue.equals("")) {
+            return null;
+        }
+
+        final ControllerService service = 
serviceLookup.getControllerService(rawValue);
+        if (serviceType.isAssignableFrom(service.getClass())) {
+            return serviceType.cast(service);
+        }
+        throw new IllegalArgumentException("Controller Service with identifier 
" + rawValue + " is of type " + service.getClass() + " and cannot be cast to " 
+ serviceType);
+    }
+
+    @Override
+    public boolean isSet() {
+        return rawValue != null;
+    }
+
+    @Override
+    public String toString() {
+        return getValue();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
new file mode 100644
index 0000000..8c9a320
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileHandlingException;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockProvenanceReporter implements ProvenanceReporter {
+    private static final Logger logger = 
LoggerFactory.getLogger(MockProvenanceReporter.class);
+    private final MockProcessSession session;
+    private final String processorId;
+    private final String processorType;
+    private final SharedSessionState sharedSessionState;
+    private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>();
+
+    public MockProvenanceReporter(final MockProcessSession session, final 
SharedSessionState sharedState, final String processorId, final String 
processorType) {
+        this.session = session;
+        this.sharedSessionState = sharedState;
+        this.processorId = processorId;
+        this.processorType = processorType;
+    }
+
+    private void verifyFlowFileKnown(final FlowFile flowFile) {
+        if (session != null && !session.isFlowFileKnown(flowFile)) {
+            throw new FlowFileHandlingException(flowFile + " is not known to " 
+ session);
+        }
+    }
+
+    Set<ProvenanceEventRecord> getEvents() {
+        return Collections.unmodifiableSet(events);
+    }
+
+    /**
+     * Removes the given event from the reporter
+     *
+     * @param event
+     *            event
+     */
+    void remove(final ProvenanceEventRecord event) {
+        events.remove(event);
+    }
+
+    void clear() {
+        events.clear();
+    }
+
+    /**
+     * Generates a Fork event for the given child and parents but does not
+     * register the event. This is useful so that a ProcessSession has the
+     * ability to de-dupe events, since one or more events may be created by 
the
+     * session itself, as well as by the Processor
+     *
+     * @param parents
+     *            parents
+     * @param child
+     *            child
+     * @return record
+     */
+    ProvenanceEventRecord generateJoinEvent(final Collection<FlowFile> 
parents, final FlowFile child) {
+        final ProvenanceEventBuilder eventBuilder = build(child, 
ProvenanceEventType.JOIN);
+        eventBuilder.addChildFlowFile(child);
+
+        for (final FlowFile parent : parents) {
+            eventBuilder.addParentFlowFile(parent);
+        }
+
+        return eventBuilder.build();
+    }
+
+    ProvenanceEventRecord generateDropEvent(final FlowFile flowFile, final 
String details) {
+        return build(flowFile, 
ProvenanceEventType.DROP).setDetails(details).build();
+    }
+
+    @Override
+    public void receive(final FlowFile flowFile, final String transitUri) {
+        receive(flowFile, transitUri, -1L);
+    }
+
+    @Override
+    public void receive(FlowFile flowFile, String transitUri, String 
sourceSystemFlowFileIdentifier) {
+        receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, -1L);
+    }
+
+    @Override
+    public void receive(final FlowFile flowFile, final String transitUri, 
final long transmissionMillis) {
+        receive(flowFile, transitUri, null, transmissionMillis);
+    }
+
+    @Override
+    public void receive(final FlowFile flowFile, final String transitUri, 
final String sourceSystemFlowFileIdentifier, final long transmissionMillis) {
+        receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, null, 
transmissionMillis);
+    }
+
+    @Override
+    public void receive(final FlowFile flowFile, final String transitUri, 
final String sourceSystemFlowFileIdentifier, final String details, final long 
transmissionMillis) {
+        verifyFlowFileKnown(flowFile);
+
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.RECEIVE)
+                
.setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    @Override
+    public void send(final FlowFile flowFile, final String transitUri, final 
long transmissionMillis) {
+        send(flowFile, transitUri, transmissionMillis, true);
+    }
+
+    @Override
+    public void send(final FlowFile flowFile, final String transitUri) {
+        send(flowFile, transitUri, null, -1L, true);
+    }
+
+    @Override
+    public void send(final FlowFile flowFile, final String transitUri, final 
String details) {
+        send(flowFile, transitUri, details, -1L, true);
+    }
+
+    @Override
+    public void send(final FlowFile flowFile, final String transitUri, final 
long transmissionMillis, final boolean force) {
+        send(flowFile, transitUri, null, transmissionMillis, force);
+    }
+
+    @Override
+    public void send(final FlowFile flowFile, final String transitUri, final 
String details, final boolean force) {
+        send(flowFile, transitUri, details, -1L, force);
+    }
+
+    @Override
+    public void send(final FlowFile flowFile, final String transitUri, final 
String details, final long transmissionMillis) {
+        send(flowFile, transitUri, details, transmissionMillis, true);
+    }
+
+    @Override
+    public void send(final FlowFile flowFile, final String transitUri, final 
String details, final long transmissionMillis, final boolean force) {
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build();
+            if (force) {
+                
sharedSessionState.addProvenanceEvents(Collections.singleton(record));
+            } else {
+                events.add(record);
+            }
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    @Override
+    public void send(final FlowFile flowFile, final String transitUri, final 
boolean force) {
+        send(flowFile, transitUri, -1L, true);
+    }
+
+    @Override
+    public void associate(final FlowFile flowFile, final String 
alternateIdentifierNamespace, final String alternateIdentifier) {
+        try {
+            String trimmedNamespace = alternateIdentifierNamespace.trim();
+            if (trimmedNamespace.endsWith(":")) {
+                trimmedNamespace = trimmedNamespace.substring(0, 
trimmedNamespace.length() - 1);
+            }
+
+            String trimmedIdentifier = alternateIdentifier.trim();
+            if (trimmedIdentifier.startsWith(":")) {
+                if (trimmedIdentifier.length() == 1) {
+                    throw new IllegalArgumentException("Illegal 
alternateIdentifier: " + alternateIdentifier);
+                }
+                trimmedIdentifier = trimmedIdentifier.substring(1);
+            }
+
+            final String alternateIdentifierUri = trimmedNamespace + ":" + 
trimmedIdentifier;
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.ADDINFO).setAlternateIdentifierUri(alternateIdentifierUri).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    ProvenanceEventRecord drop(final FlowFile flowFile, final String reason) {
+        try {
+            final ProvenanceEventBuilder builder = build(flowFile, 
ProvenanceEventType.DROP);
+            if (reason != null) {
+                builder.setDetails("Discard reason: " + reason);
+            }
+            final ProvenanceEventRecord record = builder.build();
+            events.add(record);
+            return record;
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+            return null;
+        }
+    }
+
+    void expire(final FlowFile flowFile, final String details) {
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.EXPIRE).setDetails(details).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    @Override
+    public void fork(final FlowFile parent, final Collection<FlowFile> 
children) {
+        fork(parent, children, null, -1L);
+    }
+
+    @Override
+    public void fork(final FlowFile parent, final Collection<FlowFile> 
children, final long forkDuration) {
+        fork(parent, children, null, forkDuration);
+    }
+
+    @Override
+    public void fork(final FlowFile parent, final Collection<FlowFile> 
children, final String details) {
+        fork(parent, children, details, -1L);
+    }
+
+    @Override
+    public void fork(final FlowFile parent, final Collection<FlowFile> 
children, final String details, final long forkDuration) {
+        verifyFlowFileKnown(parent);
+
+        try {
+            final ProvenanceEventBuilder eventBuilder = build(parent, 
ProvenanceEventType.FORK);
+            eventBuilder.addParentFlowFile(parent);
+            for (final FlowFile child : children) {
+                eventBuilder.addChildFlowFile(child);
+            }
+
+            if (forkDuration > -1L) {
+                eventBuilder.setEventDuration(forkDuration);
+            }
+
+            if (details != null) {
+                eventBuilder.setDetails(details);
+            }
+
+            events.add(eventBuilder.build());
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    @Override
+    public void join(final Collection<FlowFile> parents, final FlowFile child) 
{
+        join(parents, child, null, -1L);
+    }
+
+    @Override
+    public void join(final Collection<FlowFile> parents, final FlowFile child, 
final long joinDuration) {
+        join(parents, child, null, joinDuration);
+    }
+
+    @Override
+    public void join(final Collection<FlowFile> parents, final FlowFile child, 
final String details) {
+        join(parents, child, details, -1L);
+    }
+
+    @Override
+    public void join(final Collection<FlowFile> parents, final FlowFile child, 
final String details, final long joinDuration) {
+        verifyFlowFileKnown(child);
+
+        try {
+            final ProvenanceEventBuilder eventBuilder = build(child, 
ProvenanceEventType.JOIN);
+            eventBuilder.addChildFlowFile(child);
+            eventBuilder.setDetails(details);
+
+            for (final FlowFile parent : parents) {
+                eventBuilder.addParentFlowFile(parent);
+            }
+
+            events.add(eventBuilder.build());
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    @Override
+    public void clone(final FlowFile parent, final FlowFile child) {
+        verifyFlowFileKnown(child);
+
+        try {
+            final ProvenanceEventBuilder eventBuilder = build(parent, 
ProvenanceEventType.CLONE);
+            eventBuilder.addChildFlowFile(child);
+            eventBuilder.addParentFlowFile(parent);
+            events.add(eventBuilder.build());
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    @Override
+    public void modifyContent(final FlowFile flowFile) {
+        modifyContent(flowFile, null, -1L);
+    }
+
+    @Override
+    public void modifyContent(final FlowFile flowFile, final String details) {
+        modifyContent(flowFile, details, -1L);
+    }
+
+    @Override
+    public void modifyContent(final FlowFile flowFile, final long 
processingMillis) {
+        modifyContent(flowFile, null, processingMillis);
+    }
+
+    @Override
+    public void modifyContent(final FlowFile flowFile, final String details, 
final long processingMillis) {
+        verifyFlowFileKnown(flowFile);
+
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.CONTENT_MODIFIED).setEventDuration(processingMillis).setDetails(details).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    @Override
+    public void modifyAttributes(final FlowFile flowFile) {
+        modifyAttributes(flowFile, null);
+    }
+
+    @Override
+    public void modifyAttributes(final FlowFile flowFile, final String 
details) {
+        verifyFlowFileKnown(flowFile);
+
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.ATTRIBUTES_MODIFIED).setDetails(details).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    @Override
+    public void route(final FlowFile flowFile, final Relationship 
relationship) {
+        route(flowFile, relationship, null);
+    }
+
+    @Override
+    public void route(final FlowFile flowFile, final Relationship 
relationship, final long processingDuration) {
+        route(flowFile, relationship, null, processingDuration);
+    }
+
+    @Override
+    public void route(final FlowFile flowFile, final Relationship 
relationship, final String details) {
+        route(flowFile, relationship, details, -1L);
+    }
+
+    @Override
+    public void route(final FlowFile flowFile, final Relationship 
relationship, final String details, final long processingDuration) {
+        verifyFlowFileKnown(flowFile);
+
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.ROUTE).setRelationship(relationship).setDetails(details).setEventDuration(processingDuration).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    @Override
+    public void create(final FlowFile flowFile) {
+        create(flowFile, null);
+    }
+
+    @Override
+    public void create(final FlowFile flowFile, final String details) {
+        verifyFlowFileKnown(flowFile);
+
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.CREATE).setDetails(details).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    ProvenanceEventBuilder build(final FlowFile flowFile, final 
ProvenanceEventType eventType) {
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventType(eventType);
+        builder.fromFlowFile(flowFile);
+        builder.setLineageStartDate(flowFile.getLineageStartDate());
+        builder.setComponentId(processorId);
+        builder.setComponentType(processorType);
+        return builder;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
new file mode 100644
index 0000000..63a9876
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinFactory;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.Severity;
+
+public class MockReportingContext extends MockControllerServiceLookup 
implements ReportingContext, ControllerServiceLookup {
+
+    private final Map<String, ControllerServiceConfiguration> 
controllerServices;
+    private final MockEventAccess eventAccess = new MockEventAccess();
+    private final Map<PropertyDescriptor, String> properties = new HashMap<>();
+
+    private final Map<String, List<Bulletin>> componentBulletinsCreated = new 
HashMap<>();
+
+    public MockReportingContext(final Map<String, ControllerService> 
controllerServices) {
+        this.controllerServices = new HashMap<>();
+        for (final Map.Entry<String, ControllerService> entry : 
controllerServices.entrySet()) {
+            this.controllerServices.put(entry.getKey(), new 
ControllerServiceConfiguration(entry.getValue()));
+        }
+    }
+
+    @Override
+    public Map<PropertyDescriptor, String> getProperties() {
+        return Collections.unmodifiableMap(properties);
+    }
+
+    @Override
+    public PropertyValue getProperty(final PropertyDescriptor property) {
+        final String configuredValue = properties.get(property);
+        return new MockPropertyValue(configuredValue == null ? 
property.getDefaultValue() : configuredValue, this);
+    }
+
+    public void setProperty(final String propertyName, final String value) {
+        this.properties.put(new 
PropertyDescriptor.Builder().name(propertyName).build(), value);
+    }
+
+    public void setProperties(final Map<PropertyDescriptor, String> 
properties) {
+        this.properties.clear();
+        this.properties.putAll(properties);
+    }
+
+    @Override
+    public MockEventAccess getEventAccess() {
+        return eventAccess;
+    }
+
+    @Override
+    public BulletinRepository getBulletinRepository() {
+        return new MockBulletinRepository();
+    }
+
+    @Override
+    public Bulletin createBulletin(final String category, final Severity 
severity, final String message) {
+        return BulletinFactory.createBulletin(category, severity.name(), 
message);
+    }
+
+    @Override
+    public Bulletin createBulletin(final String componentId, final String 
category, final Severity severity, final String message) {
+        final Bulletin bulletin = BulletinFactory.createBulletin(null, 
componentId, "test processor", category, severity.name(), message);
+        List<Bulletin> bulletins = componentBulletinsCreated.get(componentId);
+        if (bulletins == null) {
+            bulletins = new ArrayList<>();
+            componentBulletinsCreated.put(componentId, bulletins);
+        }
+        bulletins.add(bulletin);
+        return bulletin;
+    }
+
+    @Override
+    public ControllerServiceLookup getControllerServiceLookup() {
+        return this;
+    }
+
+    /**
+     * @param componentId identifier of component to get bulletins for
+     * @return all Bulletins that have been created for the component with the
+     * given ID
+     */
+    public List<Bulletin> getComponentBulletins(final String componentId) {
+        final List<Bulletin> created = 
componentBulletinsCreated.get(componentId);
+        if (created == null) {
+            return new ArrayList<>();
+        }
+
+        return new ArrayList<>(created);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
new file mode 100644
index 0000000..0aea00a
--- /dev/null
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+public class MockReportingInitializationContext extends 
MockControllerServiceLookup implements ReportingInitializationContext, 
ControllerServiceLookup {
+
+    private final String identifier;
+    private final String name;
+    private final Map<PropertyDescriptor, String> properties = new HashMap<>();
+    private final ComponentLog logger;
+
+    public MockReportingInitializationContext(final String identifier, final 
String name, final ComponentLog logger) {
+        this.identifier = identifier;
+        this.name = name;
+        this.logger = logger;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public long getSchedulingPeriod(final TimeUnit timeUnit) {
+        return 1L;
+    }
+
+    public void setProperty(final String propertyName, final String value) {
+        setProperty(new 
PropertyDescriptor.Builder().name(propertyName).build(), value);
+    }
+
+    public void setProperty(final PropertyDescriptor propertyName, final 
String value) {
+        this.properties.put(propertyName, value);
+    }
+
+    public void setProperties(final Map<PropertyDescriptor, String> 
properties) {
+        this.properties.clear();
+        this.properties.putAll(properties);
+    }
+
+    @Override
+    public ControllerServiceLookup getControllerServiceLookup() {
+        return this;
+    }
+
+    @Override
+    public String getSchedulingPeriod() {
+        return "0 sec";
+    }
+
+    @Override
+    public SchedulingStrategy getSchedulingStrategy() {
+        return SchedulingStrategy.TIMER_DRIVEN;
+    }
+
+    @Override
+    public ComponentLog getLogger() {
+        return logger;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
new file mode 100644
index 0000000..49b8796
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
+
+public class MockSessionFactory implements ProcessSessionFactory {
+
+    private final Processor processor;
+    private final SharedSessionState sharedState;
+    private final Set<MockProcessSession> createdSessions = new 
CopyOnWriteArraySet<>();
+
+    MockSessionFactory(final SharedSessionState sharedState, final Processor 
processor) {
+        this.sharedState = sharedState;
+        this.processor = processor;
+    }
+
+    @Override
+    public ProcessSession createSession() {
+        final MockProcessSession session = new MockProcessSession(sharedState, 
processor);
+        createdSessions.add(session);
+        return session;
+    }
+
+    Set<MockProcessSession> getCreatedSessions() {
+        return Collections.unmodifiableSet(createdSessions);
+    }
+}

Reply via email to