http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java
deleted file mode 100644
index 4d0d850..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.ConnectableType;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.ProcessorNode;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.provenance.ProvenanceEventRepository;
-import org.apache.nifi.util.Connectables;
-
-/**
- *
- */
-public class ProcessContext {
-
-    private final Connectable connectable;
-    private final ContentRepository contentRepo;
-    private final FlowFileRepository flowFileRepo;
-    private final FlowFileEventRepository flowFileEventRepo;
-    private final CounterRepository counterRepo;
-    private final ProvenanceEventRepository provenanceRepo;
-    private final AtomicLong connectionIndex;
-
-    public ProcessContext(final Connectable connectable, final AtomicLong 
connectionIndex, final ContentRepository contentRepository,
-            final FlowFileRepository flowFileRepository, final 
FlowFileEventRepository flowFileEventRepository,
-            final CounterRepository counterRepository, final 
ProvenanceEventRepository provenanceRepository) {
-        this.connectable = connectable;
-        contentRepo = contentRepository;
-        flowFileRepo = flowFileRepository;
-        flowFileEventRepo = flowFileEventRepository;
-        counterRepo = counterRepository;
-        provenanceRepo = provenanceRepository;
-
-        this.connectionIndex = connectionIndex;
-    }
-
-    Connectable getConnectable() {
-        return connectable;
-    }
-
-    /**
-     *
-     * @param relationship relationship
-     * @return connections for relationship
-     */
-    Collection<Connection> getConnections(final Relationship relationship) {
-        Collection<Connection> collection = 
connectable.getConnections(relationship);
-        if (collection == null) {
-            collection = new ArrayList<>();
-        }
-        return Collections.unmodifiableCollection(collection);
-    }
-
-    /**
-     * @return an unmodifiable list containing a copy of all incoming 
connections for the processor from which FlowFiles are allowed to be pulled
-     */
-    List<Connection> getPollableConnections() {
-        if (pollFromSelfLoopsOnly()) {
-            final List<Connection> selfLoops = new ArrayList<>();
-            for (final Connection connection : 
connectable.getIncomingConnections()) {
-                if (connection.getSource() == connection.getDestination()) {
-                    selfLoops.add(connection);
-                }
-            }
-
-            return selfLoops;
-        } else {
-            return connectable.getIncomingConnections();
-        }
-    }
-
-    private boolean isTriggerWhenAnyDestinationAvailable() {
-        if (connectable.getConnectableType() != ConnectableType.PROCESSOR) {
-            return false;
-        }
-
-        final ProcessorNode procNode = (ProcessorNode) connectable;
-        return procNode.isTriggerWhenAnyDestinationAvailable();
-    }
-
-    /**
-     * @return true if we are allowed to take FlowFiles only from self-loops. 
This is the case when no Relationships are available except for self-looping 
Connections
-     */
-    private boolean pollFromSelfLoopsOnly() {
-        if (isTriggerWhenAnyDestinationAvailable()) {
-            // we can pull from any incoming connection, as long as at least 
one downstream connection
-            // is available for each relationship.
-            // I.e., we can poll only from self if no relationships are 
available
-            return !Connectables.anyRelationshipAvailable(connectable);
-        } else {
-            for (final Connection connection : connectable.getConnections()) {
-                // A downstream connection is full. We are only allowed to 
pull from self-loops.
-                if (connection.getFlowFileQueue().isFull()) {
-                    return true;
-                }
-            }
-        }
-
-        return false;
-    }
-
-    void adjustCounter(final String name, final long delta) {
-        final String localContext = connectable.getName() + " (" + 
connectable.getIdentifier() + ")";
-        final String globalContext = "All " + connectable.getComponentType() + 
"'s";
-
-        counterRepo.adjustCounter(localContext, name, delta);
-        counterRepo.adjustCounter(globalContext, name, delta);
-    }
-
-    ContentRepository getContentRepository() {
-        return contentRepo;
-    }
-
-    FlowFileRepository getFlowFileRepository() {
-        return flowFileRepo;
-    }
-
-    public FlowFileEventRepository getFlowFileEventRepository() {
-        return flowFileEventRepo;
-    }
-
-    ProvenanceEventRepository getProvenanceRepository() {
-        return provenanceRepo;
-    }
-
-    long getNextFlowFileSequence() {
-        return flowFileRepo.getNextFlowFileSequence();
-    }
-
-    int getNextIncomingConnectionIndex() {
-        final int numIncomingConnections = 
connectable.getIncomingConnections().size();
-        return (int) (connectionIndex.getAndIncrement() % Math.max(1, 
numIncomingConnections));
-    }
-
-    public boolean isAnyRelationshipAvailable() {
-        for (final Relationship relationship : 
getConnectable().getRelationships()) {
-            final Collection<Connection> connections = 
getConnections(relationship);
-
-            boolean available = true;
-            for (final Connection connection : connections) {
-                if (connection.getFlowFileQueue().isFull()) {
-                    available = false;
-                    break;
-                }
-            }
-
-            if (available) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    public int getAvailableRelationshipCount() {
-        int count = 0;
-        for (final Relationship relationship : connectable.getRelationships()) 
{
-            final Collection<Connection> connections = 
connectable.getConnections(relationship);
-            if (connections == null || connections.isEmpty()) {
-                count++;
-            } else {
-                boolean available = true;
-                for (final Connection connection : connections) {
-                    // consider self-loops available
-                    if (connection.getSource() == connection.getDestination()) 
{
-                        continue;
-                    }
-
-                    if (connection.getFlowFileQueue().isFull()) {
-                        available = false;
-                        break;
-                    }
-                }
-
-                if (available) {
-                    count++;
-                }
-            }
-        }
-
-        return count;
-    }
-
-    /**
-     * A Relationship is said to be Available if and only if all Connections 
for that Relationship are either self-loops or have non-full queues.
-     *
-     * @param requiredNumber minimum number of relationships that must have 
availability
-     * @return Checks if at least <code>requiredNumber</code> of 
Relationationships are "available." If so, returns <code>true</code>, otherwise 
returns <code>false</code>
-     */
-    public boolean isRelationshipAvailabilitySatisfied(final int 
requiredNumber) {
-        int unavailable = 0;
-
-        final Collection<Relationship> allRelationships = 
connectable.getRelationships();
-        final int numRelationships = allRelationships.size();
-
-        // the maximum number of Relationships that can be unavailable and 
still return true.
-        final int maxUnavailable = numRelationships - requiredNumber;
-
-        for (final Relationship relationship : allRelationships) {
-            final Collection<Connection> connections = 
connectable.getConnections(relationship);
-            if (connections != null && !connections.isEmpty()) {
-                boolean available = true;
-                for (final Connection connection : connections) {
-                    // consider self-loops available
-                    if (connection.getSource() == connection.getDestination()) 
{
-                        continue;
-                    }
-
-                    if (connection.getFlowFileQueue().isFull()) {
-                        available = false;
-                        break;
-                    }
-                }
-
-                if (!available) {
-                    unavailable++;
-                    if (unavailable > maxUnavailable) {
-                        return false;
-                    }
-                }
-            }
-        }
-
-        return true;
-    }
-
-    public Set<Relationship> getAvailableRelationships() {
-        final Set<Relationship> set = new HashSet<>();
-        for (final Relationship relationship : 
getConnectable().getRelationships()) {
-            final Collection<Connection> connections = 
getConnections(relationship);
-            if (connections.isEmpty()) {
-                set.add(relationship);
-            } else {
-                boolean available = true;
-                for (final Connection connection : connections) {
-                    if (connection.getFlowFileQueue().isFull()) {
-                        available = false;
-                    }
-                }
-
-                if (available) {
-                    set.add(relationship);
-                }
-            }
-        }
-
-        return set;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
new file mode 100644
index 0000000..a407371
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.util.Connectables;
+
+/**
+ *
+ */
+public class RepositoryContext {
+
+    private final Connectable connectable;
+    private final ContentRepository contentRepo;
+    private final FlowFileRepository flowFileRepo;
+    private final FlowFileEventRepository flowFileEventRepo;
+    private final CounterRepository counterRepo;
+    private final ProvenanceEventRepository provenanceRepo;
+    private final AtomicLong connectionIndex;
+
+    public RepositoryContext(final Connectable connectable, final AtomicLong 
connectionIndex, final ContentRepository contentRepository,
+            final FlowFileRepository flowFileRepository, final 
FlowFileEventRepository flowFileEventRepository,
+            final CounterRepository counterRepository, final 
ProvenanceEventRepository provenanceRepository) {
+        this.connectable = connectable;
+        contentRepo = contentRepository;
+        flowFileRepo = flowFileRepository;
+        flowFileEventRepo = flowFileEventRepository;
+        counterRepo = counterRepository;
+        provenanceRepo = provenanceRepository;
+
+        this.connectionIndex = connectionIndex;
+    }
+
+    Connectable getConnectable() {
+        return connectable;
+    }
+
+    /**
+     *
+     * @param relationship relationship
+     * @return connections for relationship
+     */
+    Collection<Connection> getConnections(final Relationship relationship) {
+        Collection<Connection> collection = 
connectable.getConnections(relationship);
+        if (collection == null) {
+            collection = new ArrayList<>();
+        }
+        return Collections.unmodifiableCollection(collection);
+    }
+
+    /**
+     * @return an unmodifiable list containing a copy of all incoming 
connections for the processor from which FlowFiles are allowed to be pulled
+     */
+    List<Connection> getPollableConnections() {
+        if (pollFromSelfLoopsOnly()) {
+            final List<Connection> selfLoops = new ArrayList<>();
+            for (final Connection connection : 
connectable.getIncomingConnections()) {
+                if (connection.getSource() == connection.getDestination()) {
+                    selfLoops.add(connection);
+                }
+            }
+
+            return selfLoops;
+        } else {
+            return connectable.getIncomingConnections();
+        }
+    }
+
+    private boolean isTriggerWhenAnyDestinationAvailable() {
+        if (connectable.getConnectableType() != ConnectableType.PROCESSOR) {
+            return false;
+        }
+
+        final ProcessorNode procNode = (ProcessorNode) connectable;
+        return procNode.isTriggerWhenAnyDestinationAvailable();
+    }
+
+    /**
+     * @return true if we are allowed to take FlowFiles only from self-loops. 
This is the case when no Relationships are available except for self-looping 
Connections
+     */
+    private boolean pollFromSelfLoopsOnly() {
+        if (isTriggerWhenAnyDestinationAvailable()) {
+            // we can pull from any incoming connection, as long as at least 
one downstream connection
+            // is available for each relationship.
+            // I.e., we can poll only from self if no relationships are 
available
+            return !Connectables.anyRelationshipAvailable(connectable);
+        } else {
+            for (final Connection connection : connectable.getConnections()) {
+                // A downstream connection is full. We are only allowed to 
pull from self-loops.
+                if (connection.getFlowFileQueue().isFull()) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    void adjustCounter(final String name, final long delta) {
+        final String localContext = connectable.getName() + " (" + 
connectable.getIdentifier() + ")";
+        final String globalContext = "All " + connectable.getComponentType() + 
"'s";
+
+        counterRepo.adjustCounter(localContext, name, delta);
+        counterRepo.adjustCounter(globalContext, name, delta);
+    }
+
+    ContentRepository getContentRepository() {
+        return contentRepo;
+    }
+
+    FlowFileRepository getFlowFileRepository() {
+        return flowFileRepo;
+    }
+
+    public FlowFileEventRepository getFlowFileEventRepository() {
+        return flowFileEventRepo;
+    }
+
+    ProvenanceEventRepository getProvenanceRepository() {
+        return provenanceRepo;
+    }
+
+    long getNextFlowFileSequence() {
+        return flowFileRepo.getNextFlowFileSequence();
+    }
+
+    int getNextIncomingConnectionIndex() {
+        final int numIncomingConnections = 
connectable.getIncomingConnections().size();
+        return (int) (connectionIndex.getAndIncrement() % Math.max(1, 
numIncomingConnections));
+    }
+
+    public boolean isAnyRelationshipAvailable() {
+        for (final Relationship relationship : 
getConnectable().getRelationships()) {
+            final Collection<Connection> connections = 
getConnections(relationship);
+
+            boolean available = true;
+            for (final Connection connection : connections) {
+                if (connection.getFlowFileQueue().isFull()) {
+                    available = false;
+                    break;
+                }
+            }
+
+            if (available) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    public int getAvailableRelationshipCount() {
+        int count = 0;
+        for (final Relationship relationship : connectable.getRelationships()) 
{
+            final Collection<Connection> connections = 
connectable.getConnections(relationship);
+            if (connections == null || connections.isEmpty()) {
+                count++;
+            } else {
+                boolean available = true;
+                for (final Connection connection : connections) {
+                    // consider self-loops available
+                    if (connection.getSource() == connection.getDestination()) 
{
+                        continue;
+                    }
+
+                    if (connection.getFlowFileQueue().isFull()) {
+                        available = false;
+                        break;
+                    }
+                }
+
+                if (available) {
+                    count++;
+                }
+            }
+        }
+
+        return count;
+    }
+
+    /**
+     * A Relationship is said to be Available if and only if all Connections 
for that Relationship are either self-loops or have non-full queues.
+     *
+     * @param requiredNumber minimum number of relationships that must have 
availability
+     * @return Checks if at least <code>requiredNumber</code> of 
Relationationships are "available." If so, returns <code>true</code>, otherwise 
returns <code>false</code>
+     */
+    public boolean isRelationshipAvailabilitySatisfied(final int 
requiredNumber) {
+        int unavailable = 0;
+
+        final Collection<Relationship> allRelationships = 
connectable.getRelationships();
+        final int numRelationships = allRelationships.size();
+
+        // the maximum number of Relationships that can be unavailable and 
still return true.
+        final int maxUnavailable = numRelationships - requiredNumber;
+
+        for (final Relationship relationship : allRelationships) {
+            final Collection<Connection> connections = 
connectable.getConnections(relationship);
+            if (connections != null && !connections.isEmpty()) {
+                boolean available = true;
+                for (final Connection connection : connections) {
+                    // consider self-loops available
+                    if (connection.getSource() == connection.getDestination()) 
{
+                        continue;
+                    }
+
+                    if (connection.getFlowFileQueue().isFull()) {
+                        available = false;
+                        break;
+                    }
+                }
+
+                if (!available) {
+                    unavailable++;
+                    if (unavailable > maxUnavailable) {
+                        return false;
+                    }
+                }
+            }
+        }
+
+        return true;
+    }
+
+    public Set<Relationship> getAvailableRelationships() {
+        final Set<Relationship> set = new HashSet<>();
+        for (final Relationship relationship : 
getConnectable().getRelationships()) {
+            final Collection<Connection> connections = 
getConnections(relationship);
+            if (connections.isEmpty()) {
+                set.add(relationship);
+            } else {
+                boolean available = true;
+                for (final Connection connection : connections) {
+                    if (connection.getFlowFileQueue().isFull()) {
+                        available = false;
+                    }
+                }
+
+                if (available) {
+                    set.add(relationship);
+                }
+            }
+        }
+
+        return set;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 24fbf72..eed4dbe 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -16,10 +16,38 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.commons.io.IOUtils;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.lifecycle.TaskTermination;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
@@ -30,6 +58,8 @@ import 
org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
 import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
 import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
+import org.apache.nifi.controller.repository.io.TaskTerminationInputStream;
+import org.apache.nifi.controller.repository.io.TaskTerminationOutputStream;
 import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -40,6 +70,7 @@ import 
org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.FlowFileHandlingException;
 import org.apache.nifi.processor.exception.MissingFlowFileException;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.exception.TerminatedTaskException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.io.StreamCallback;
@@ -55,33 +86,6 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
 /**
  * <p>
  * Provides a ProcessSession that ensures all accesses, changes and transfers
@@ -106,11 +110,12 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     private static final Logger claimLog = 
LoggerFactory.getLogger(StandardProcessSession.class.getSimpleName() + 
".claims");
     private static final int MAX_ROLLBACK_FLOWFILES_TO_LOG = 5;
 
-    private final Map<FlowFileRecord, StandardRepositoryRecord> records = new 
HashMap<>();
-    private final Map<String, StandardFlowFileEvent> connectionCounts = new 
HashMap<>();
-    private final Map<FlowFileQueue, Set<FlowFileRecord>> 
unacknowledgedFlowFiles = new HashMap<>();
-    private final Map<ContentClaim, ByteCountingOutputStream> 
appendableStreams = new HashMap<>();
-    private final ProcessContext context;
+    private final Map<FlowFileRecord, StandardRepositoryRecord> records = new 
ConcurrentHashMap<>();
+    private final Map<String, StandardFlowFileEvent> connectionCounts = new 
ConcurrentHashMap<>();
+    private final Map<FlowFileQueue, Set<FlowFileRecord>> 
unacknowledgedFlowFiles = new ConcurrentHashMap<>();
+    private final Map<ContentClaim, ByteCountingOutputStream> 
appendableStreams = new ConcurrentHashMap<>();
+    private final RepositoryContext context;
+    private final TaskTermination taskTermination;
     private final Map<FlowFile, Integer> readRecursionSet = new HashMap<>();// 
set used to track what is currently being operated on to prevent logic failures 
if recursive calls occurring
     private final Set<FlowFile> writeRecursionSet = new HashSet<>();
     private final Map<FlowFile, Path> deleteOnCommit = new HashMap<>();
@@ -137,9 +142,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     private long processingStartTime;
 
     // List of InputStreams that have been opened by calls to {@link 
#read(FlowFile)} and not yet closed
-    private final Map<FlowFile, InputStream> openInputStreams = new 
HashMap<>();
+    private final Map<FlowFile, InputStream> openInputStreams = new 
ConcurrentHashMap<>();
     // List of OutputStreams that have been opened by calls to {@link 
#write(FlowFile)} and not yet closed
-    private final Map<FlowFile, OutputStream> openOutputStreams = new 
HashMap<>();
+    private final Map<FlowFile, OutputStream> openOutputStreams = new 
ConcurrentHashMap<>();
 
     // maps a FlowFile to all Provenance Events that were generated for that 
FlowFile.
     // we do this so that if we generate a Fork event, for example, and then 
remove the event in the same
@@ -153,8 +158,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     private Checkpoint checkpoint = new Checkpoint();
     private final ContentClaimWriteCache claimCache;
 
-    public StandardProcessSession(final ProcessContext context) {
+    public StandardProcessSession(final RepositoryContext context, final 
TaskTermination taskTermination) {
         this.context = context;
+        this.taskTermination = taskTermination;
 
         final Connectable connectable = context.getConnectable();
         final String componentType;
@@ -194,6 +200,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         processingStartTime = System.nanoTime();
     }
 
+    private void verifyTaskActive() {
+        if (taskTermination.isTerminated()) {
+            rollback(false, true);
+            throw new TerminatedTaskException();
+        }
+    }
+
     private void closeStreams(final Map<FlowFile, ? extends Closeable> 
streamMap, final String action, final String streamType) {
         final Map<FlowFile, ? extends Closeable> openStreamCopy = new 
HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a 
copy of the List
         for (final Map.Entry<FlowFile, ? extends Closeable> entry : 
openStreamCopy.entrySet()) {
@@ -212,7 +225,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     public void checkpoint() {
-
+        verifyTaskActive();
         resetWriteClaims(false);
 
         closeStreams(openInputStreams, "committed", "input");
@@ -318,6 +331,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void commit() {
+        verifyTaskActive();
         checkpoint();
         commit(this.checkpoint);
         this.checkpoint = null;
@@ -783,6 +797,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord 
rawEvent, final FlowFile flowFile) {
+        verifyTaskActive();
+
         final StandardRepositoryRecord repoRecord = records.get(flowFile);
         if (repoRecord == null) {
             throw new FlowFileHandlingException(flowFile + " is not known in 
this session (" + toString() + ")");
@@ -927,6 +943,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     @Override
     public void rollback(final boolean penalize) {
         rollback(penalize, false);
+        verifyTaskActive();
     }
 
     private void rollback(final boolean penalize, final boolean 
rollbackCheckpoint) {
@@ -1153,6 +1170,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void migrate(final ProcessSession newOwner, final 
Collection<FlowFile> flowFiles) {
+        verifyTaskActive();
+
         if (Objects.requireNonNull(newOwner) == this) {
             throw new IllegalArgumentException("Cannot migrate FlowFiles from 
a Process Session to itself");
         }
@@ -1469,6 +1488,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void adjustCounter(final String name, final long delta, final 
boolean immediate) {
+        verifyTaskActive();
+
         final Map<String, Long> counters;
         if (immediate) {
             if (immediateCounters == null) {
@@ -1501,6 +1522,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile get() {
+        verifyTaskActive();
         final List<Connection> connections = context.getPollableConnections();
         final int numConnections = connections.size();
         for (int numAttempts = 0; numAttempts < numConnections; numAttempts++) 
{
@@ -1520,6 +1542,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public List<FlowFile> get(final int maxResults) {
+        verifyTaskActive();
+
         if (maxResults < 0) {
             throw new IllegalArgumentException();
         }
@@ -1554,6 +1578,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public List<FlowFile> get(final FlowFileFilter filter) {
+        verifyTaskActive();
+
         return get(new ConnectionPoller() {
             @Override
             public List<FlowFileRecord> poll(final Connection connection, 
final Set<FlowFileRecord> expiredRecords) {
@@ -1562,31 +1588,6 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }, true);
     }
 
-    private List<FlowFile> get(final Connection connection, final 
ConnectionPoller poller, final boolean lockQueue) {
-        if (lockQueue) {
-            connection.lock();
-        }
-
-        try {
-            final Set<FlowFileRecord> expired = new HashSet<>();
-            final List<FlowFileRecord> newlySelected = poller.poll(connection, 
expired);
-            removeExpired(expired, connection);
-
-            if (newlySelected.isEmpty() && expired.isEmpty()) {
-                return new ArrayList<>();
-            }
-
-            for (final FlowFileRecord flowFile : newlySelected) {
-                registerDequeuedRecord(flowFile, connection);
-            }
-
-            return new ArrayList<>(newlySelected);
-        } finally {
-            if (lockQueue) {
-                connection.unlock();
-            }
-        }
-    }
 
     private List<FlowFile> get(final ConnectionPoller poller, final boolean 
lockAllQueues) {
         final List<Connection> connections = context.getPollableConnections();
@@ -1630,6 +1631,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public QueueSize getQueueSize() {
+        verifyTaskActive();
+
         int flowFileCount = 0;
         long byteCount = 0L;
         for (final Connection conn : context.getPollableConnections()) {
@@ -1642,6 +1645,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile create() {
+        verifyTaskActive();
+
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(CoreAttributes.FILENAME.key(), 
String.valueOf(System.nanoTime()));
         attrs.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
@@ -1659,12 +1664,15 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile clone(FlowFile example) {
+        verifyTaskActive();
         example = validateRecordState(example);
         return clone(example, 0L, example.getSize());
     }
 
     @Override
     public FlowFile clone(FlowFile example, final long offset, final long 
size) {
+        verifyTaskActive();
+
         example = validateRecordState(example);
         final StandardRepositoryRecord exampleRepoRecord = 
records.get(example);
         final FlowFileRecord currRec = exampleRepoRecord.getCurrent();
@@ -1734,6 +1742,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile penalize(FlowFile flowFile) {
+        verifyTaskActive();
+
         flowFile = validateRecordState(flowFile);
         final StandardRepositoryRecord record = records.get(flowFile);
         final long expirationEpochMillis = System.currentTimeMillis() + 
context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
@@ -1744,6 +1754,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile putAttribute(FlowFile flowFile, final String key, final 
String value) {
+        verifyTaskActive();
         flowFile = validateRecordState(flowFile);
 
         if (CoreAttributes.UUID.key().equals(key)) {
@@ -1759,6 +1770,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile putAllAttributes(FlowFile flowFile, final Map<String, 
String> attributes) {
+        verifyTaskActive();
+
         flowFile = validateRecordState(flowFile);
         final StandardRepositoryRecord record = records.get(flowFile);
 
@@ -1779,6 +1792,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile removeAttribute(FlowFile flowFile, final String key) {
+        verifyTaskActive();
         flowFile = validateRecordState(flowFile);
 
         if (CoreAttributes.UUID.key().equals(key)) {
@@ -1793,6 +1807,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile removeAllAttributes(FlowFile flowFile, final Set<String> 
keys) {
+        verifyTaskActive();
         flowFile = validateRecordState(flowFile);
 
         if (keys == null) {
@@ -1817,6 +1832,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile removeAllAttributes(FlowFile flowFile, final Pattern 
keyPattern) {
+        verifyTaskActive();
+
         flowFile = validateRecordState(flowFile);
         final StandardRepositoryRecord record = records.get(flowFile);
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build();
@@ -1851,6 +1868,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void transfer(FlowFile flowFile, final Relationship relationship) {
+        verifyTaskActive();
         flowFile = validateRecordState(flowFile);
         final int numDestinations = 
context.getConnections(relationship).size();
         final int multiplier = Math.max(1, numDestinations);
@@ -1881,6 +1899,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void transfer(FlowFile flowFile) {
+        verifyTaskActive();
+
         flowFile = validateRecordState(flowFile);
         final StandardRepositoryRecord record = records.get(flowFile);
         if (record.getOriginalQueue() == null) {
@@ -1899,6 +1919,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void transfer(Collection<FlowFile> flowFiles, final Relationship 
relationship) {
+        verifyTaskActive();
         flowFiles = validateRecordState(flowFiles);
 
         boolean autoTerminated = false;
@@ -1936,6 +1957,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void remove(FlowFile flowFile) {
+        verifyTaskActive();
+
         flowFile = validateRecordState(flowFile);
         final StandardRepositoryRecord record = records.get(flowFile);
         record.markForDelete();
@@ -1957,6 +1980,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void remove(Collection<FlowFile> flowFiles) {
+        verifyTaskActive();
+
         flowFiles = validateRecordState(flowFiles);
         for (final FlowFile flowFile : flowFiles) {
             final StandardRepositoryRecord record = records.get(flowFile);
@@ -2126,7 +2151,12 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 try {
                     StreamUtils.skip(rawInStream, offset);
                 } catch(IOException ioe) {
-                    IOUtils.closeQuietly(rawInStream);
+                    try {
+                        rawInStream.close();
+                    } catch (final Exception e) {
+                        ioe.addSuppressed(ioe);
+                    }
+
                     throw ioe;
                 }
                 return rawInStream;
@@ -2147,6 +2177,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void read(FlowFile source, boolean allowSessionStreamManagement, 
InputStreamCallback reader) {
+        verifyTaskActive();
+
         source = validateRecordState(source, true);
         final StandardRepositoryRecord record = records.get(source);
 
@@ -2172,7 +2204,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
             try {
                 incrementReadCount(source);
-                reader.process(ffais);
+                reader.process(createTaskTerminationStream(ffais));
 
                 // Allow processors to close the file after reading to avoid 
too many files open or do smart session stream management.
                 if (this.currentReadClaimStream != null && 
!allowSessionStreamManagement) {
@@ -2201,6 +2233,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public InputStream read(FlowFile source) {
+        verifyTaskActive();
+
         source = validateRecordState(source, true);
         final StandardRepositoryRecord record = records.get(source);
 
@@ -2302,7 +2336,16 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         incrementReadCount(sourceFlowFile);
         openInputStreams.put(sourceFlowFile, errorHandlingStream);
-        return errorHandlingStream;
+
+        return createTaskTerminationStream(errorHandlingStream);
+    }
+
+    private InputStream createTaskTerminationStream(final InputStream 
delegate) {
+        return new TaskTerminationInputStream(delegate, taskTermination, () -> 
rollback(false, true));
+    }
+
+    private OutputStream createTaskTerminationStream(final OutputStream 
delegate) {
+        return new TaskTerminationOutputStream(delegate, taskTermination, () 
-> rollback(false, true));
     }
 
     private void incrementReadCount(final FlowFile flowFile) {
@@ -2325,11 +2368,15 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile merge(final Collection<FlowFile> sources, final FlowFile 
destination) {
+        verifyTaskActive();
+
         return merge(sources, destination, null, null, null);
     }
 
     @Override
     public FlowFile merge(Collection<FlowFile> sources, FlowFile destination, 
final byte[] header, final byte[] footer, final byte[] demarcator) {
+        verifyTaskActive();
+
         sources = validateRecordState(sources);
         destination = validateRecordState(destination);
         if (sources.contains(destination)) {
@@ -2431,6 +2478,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public OutputStream write(FlowFile source) {
+        verifyTaskActive();
         source = validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
 
@@ -2530,7 +2578,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
             writeRecursionSet.add(source);
             openOutputStreams.put(source, errorHandlingOutputStream);
-            return errorHandlingOutputStream;
+            return createTaskTerminationStream(errorHandlingOutputStream);
         } catch (final ContentNotFoundException nfe) {
             resetWriteClaims(); // need to reset write claim before we can 
remove the claim
             destroyContent(newClaim);
@@ -2553,6 +2601,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile write(FlowFile source, final OutputStreamCallback writer) {
+        verifyTaskActive();
         source = validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
 
@@ -2568,7 +2617,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 final ByteCountingOutputStream countingOut = new 
ByteCountingOutputStream(disableOnClose)) {
                 try {
                     writeRecursionSet.add(source);
-                    writer.process(new FlowFileAccessOutputStream(countingOut, 
source));
+                    final OutputStream ffaos = new 
FlowFileAccessOutputStream(countingOut, source);
+                    writer.process(createTaskTerminationStream(ffaos));
                 } finally {
                     writtenToFlowFile = countingOut.getBytesWritten();
                     bytesWritten += countingOut.getBytesWritten();
@@ -2609,6 +2659,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile append(FlowFile source, final OutputStreamCallback writer) 
{
+        verifyTaskActive();
+
         source = validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
         long newSize = 0L;
@@ -2616,7 +2668,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         // Get the current Content Claim from the record and see if we already 
have
         // an OutputStream that we can append to.
         final ContentClaim oldClaim = record.getCurrentClaim();
-        ByteCountingOutputStream outStream = appendableStreams.get(oldClaim);
+        ByteCountingOutputStream outStream = oldClaim == null ? null : 
appendableStreams.get(oldClaim);
         long originalByteWrittenCount = 0;
 
         ContentClaim newClaim = null;
@@ -2789,6 +2841,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile write(FlowFile source, final StreamCallback writer) {
+        verifyTaskActive();
         source = validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
         final ContentClaim currClaim = record.getCurrentClaim();
@@ -2821,10 +2874,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 // ContentNotFoundException because if it is thrown, the 
Processor code may catch it and do something else with it
                 // but in reality, if it is thrown, we want to know about it 
and handle it, even if the Processor code catches it.
                 final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingIn, source, currClaim);
+                final FlowFileAccessOutputStream ffaos = new 
FlowFileAccessOutputStream(countingOut, source);
                 boolean cnfeThrown = false;
 
                 try {
-                    writer.process(ffais, new 
FlowFileAccessOutputStream(countingOut, source));
+                    writer.process(createTaskTerminationStream(ffais), 
createTaskTerminationStream(ffaos));
                 } catch (final ContentNotFoundException cnfe) {
                     cnfeThrown = true;
                     throw cnfe;
@@ -2868,6 +2922,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile importFrom(final Path source, final boolean 
keepSourceFile, FlowFile destination) {
+        verifyTaskActive();
+
         destination = validateRecordState(destination);
         // TODO: find a better solution. With Windows 7 and Java 7 (very early 
update, at least), Files.isWritable(source.getParent()) returns false, even 
when it should be true.
         if (!keepSourceFile && !Files.isWritable(source.getParent()) && 
!source.getParent().toFile().canWrite()) {
@@ -2918,6 +2974,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile importFrom(final InputStream source, FlowFile destination) 
{
+        verifyTaskActive();
+
         destination = validateRecordState(destination);
         final StandardRepositoryRecord record = records.get(destination);
         ContentClaim newClaim = null;
@@ -2929,7 +2987,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
                 claimLog.debug("Creating ContentClaim {} for 'importFrom' for 
{}", newClaim, destination);
 
-                newSize = context.getContentRepository().importFrom(source, 
newClaim);
+                newSize = 
context.getContentRepository().importFrom(createTaskTerminationStream(source), 
newClaim);
                 bytesWritten += newSize;
             } catch (final IOException e) {
                 throw new FlowFileAccessException("Unable to create 
ContentClaim due to " + e.toString(), e);
@@ -2955,6 +3013,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void exportTo(FlowFile source, final Path destination, final 
boolean append) {
+        verifyTaskActive();
         source = validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
         try {
@@ -2973,6 +3032,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void exportTo(FlowFile source, final OutputStream destination) {
+        verifyTaskActive();
         source = validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
 
@@ -2997,25 +3057,24 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             // and translates into either FlowFileAccessException or 
ContentNotFoundException. We keep track of any
             // ContentNotFoundException because if it is thrown, the Processor 
code may catch it and do something else with it
             // but in reality, if it is thrown, we want to know about it and 
handle it, even if the Processor code catches it.
-            final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
-            boolean cnfeThrown = false;
+            try (final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim())) {
+                boolean cnfeThrown = false;
 
-            try {
-                incrementReadCount(source);
-                StreamUtils.copy(ffais, destination, source.getSize());
-            } catch (final ContentNotFoundException cnfe) {
-                cnfeThrown = true;
-                throw cnfe;
-            } finally {
-                decrementReadCount(source);
+                try {
+                    incrementReadCount(source);
+                    StreamUtils.copy(ffais, 
createTaskTerminationStream(destination), source.getSize());
+                } catch (final ContentNotFoundException cnfe) {
+                    cnfeThrown = true;
+                    throw cnfe;
+                } finally {
+                    decrementReadCount(source);
 
-                IOUtils.closeQuietly(ffais);
-                // if cnfeThrown is true, we don't need to re-throw the 
Exception; it will propagate.
-                if (!cnfeThrown && ffais.getContentNotFoundException() != 
null) {
-                    throw ffais.getContentNotFoundException();
+                    // if cnfeThrown is true, we don't need to re-throw the 
Exception; it will propagate.
+                    if (!cnfeThrown && ffais.getContentNotFoundException() != 
null) {
+                        throw ffais.getContentNotFoundException();
+                    }
                 }
             }
-
         } catch (final ContentNotFoundException nfe) {
             handleContentNotFound(nfe, record);
         } catch (final IOException ex) {
@@ -3106,6 +3165,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile create(FlowFile parent) {
+        verifyTaskActive();
         parent = getMostRecent(parent);
 
         final Map<String, String> newAttributes = new HashMap<>(3);
@@ -3144,6 +3204,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public FlowFile create(Collection<FlowFile> parents) {
+        verifyTaskActive();
+
         parents = 
parents.stream().map(this::getMostRecent).collect(Collectors.toList());
 
         final Map<String, String> newAttributes = intersectAttributes(parents);
@@ -3229,12 +3291,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     protected void finalize() throws Throwable {
-        rollback();
+        rollback(false, false);
         super.finalize();
     }
 
     @Override
     public ProvenanceReporter getProvenanceReporter() {
+        verifyTaskActive();
         return provenanceReporter;
     }
 
@@ -3261,9 +3324,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         private final List<ProvenanceEventRecord> autoTerminatedEvents = new 
ArrayList<>();
         private final Set<ProvenanceEventRecord> reportedEvents = new 
LinkedHashSet<>();
 
-        private final Map<FlowFileRecord, StandardRepositoryRecord> records = 
new HashMap<>();
-        private final Map<String, StandardFlowFileEvent> connectionCounts = 
new HashMap<>();
-        private final Map<FlowFileQueue, Set<FlowFileRecord>> 
unacknowledgedFlowFiles = new HashMap<>();
+        private final Map<FlowFileRecord, StandardRepositoryRecord> records = 
new ConcurrentHashMap<>();
+        private final Map<String, StandardFlowFileEvent> connectionCounts = 
new ConcurrentHashMap<>();
+        private final Map<FlowFileQueue, Set<FlowFileRecord>> 
unacknowledgedFlowFiles = new ConcurrentHashMap<>();
 
         private Map<String, Long> countersOnCommit = new HashMap<>();
         private Map<String, Long> immediateCounters = new HashMap<>();
@@ -3294,18 +3357,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             
this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
 
             if (session.countersOnCommit != null) {
-                if (this.countersOnCommit == null) {
-                    this.countersOnCommit = new HashMap<>();
-                }
-
                 this.countersOnCommit.putAll(session.countersOnCommit);
             }
 
             if (session.immediateCounters != null) {
-                if (this.immediateCounters == null) {
-                    this.immediateCounters = new HashMap<>();
-                }
-
                 this.immediateCounters.putAll(session.immediateCounters);
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java
index 76cecbb..7e4d5b0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java
@@ -16,18 +16,21 @@
  */
 package org.apache.nifi.controller.repository;
 
+import org.apache.nifi.controller.lifecycle.TaskTermination;
 import org.apache.nifi.processor.ProcessSessionFactory;
 
 public class StandardProcessSessionFactory implements ProcessSessionFactory {
 
-    private final ProcessContext context;
+    private final RepositoryContext context;
+    private final TaskTermination taskTermination;
 
-    public StandardProcessSessionFactory(final ProcessContext context) {
+    public StandardProcessSessionFactory(final RepositoryContext context, 
final TaskTermination taskTermination) {
         this.context = context;
+        this.taskTermination = taskTermination;
     }
 
     @Override
     public StandardProcessSession createSession() {
-        return new StandardProcessSession(context);
+        return new StandardProcessSession(context, taskTermination);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WeakHashMapProcessSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WeakHashMapProcessSessionFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WeakHashMapProcessSessionFactory.java
new file mode 100644
index 0000000..cda65bb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WeakHashMapProcessSessionFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.TerminatedTaskException;
+
+public class WeakHashMapProcessSessionFactory implements 
ActiveProcessSessionFactory {
+    private final ProcessSessionFactory delegate;
+    private final Map<ProcessSession, Boolean> sessionMap = new 
WeakHashMap<>();
+    private boolean terminated = false;
+
+    public WeakHashMapProcessSessionFactory(final ProcessSessionFactory 
delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public synchronized ProcessSession createSession() {
+        if (terminated) {
+            throw new TerminatedTaskException();
+        }
+
+        final ProcessSession session = delegate.createSession();
+        sessionMap.put(session, Boolean.TRUE);
+        return session;
+    }
+
+    @Override
+    public synchronized void terminateActiveSessions() {
+        terminated = true;
+        for (final ProcessSession session : sessionMap.keySet()) {
+            try {
+                session.rollback();
+            } catch (final TerminatedTaskException tte) {
+                // ignore
+            }
+        }
+
+        sessionMap.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java
index 6b608ac..bf3a870 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java
@@ -20,16 +20,16 @@ package org.apache.nifi.controller.repository.claim;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.nifi.controller.repository.ContentRepository;
 
 public class ContentClaimWriteCache {
     private final ContentRepository contentRepo;
-    private final Map<ResourceClaim, OutputStream> streamMap = new HashMap<>();
+    private final Map<ResourceClaim, OutputStream> streamMap = new 
ConcurrentHashMap<>();
     private final Queue<ContentClaim> queue = new LinkedList<>();
     private final int bufferSize;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationInputStream.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationInputStream.java
new file mode 100644
index 0000000..ffbe592
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationInputStream.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.controller.lifecycle.TaskTermination;
+import org.apache.nifi.processor.exception.TerminatedTaskException;
+
+public class TaskTerminationInputStream extends InputStream {
+    private final TaskTermination taskTermination;
+    private final InputStream delegate;
+    private final Runnable terminatedCallback;
+
+    public TaskTerminationInputStream(final InputStream delegate, final 
TaskTermination taskTermination, final Runnable terminatedCallback) {
+        this.delegate = delegate;
+        this.taskTermination = taskTermination;
+        this.terminatedCallback = terminatedCallback;
+    }
+
+    private void verifyNotTerminated() {
+        if (taskTermination.isTerminated()) {
+            final TerminatedTaskException tte = new TerminatedTaskException();
+
+            if (terminatedCallback != null) {
+                try {
+                    terminatedCallback.run();
+                } catch (final Exception e) {
+                    tte.addSuppressed(e);
+                }
+            }
+
+            throw tte;
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        verifyNotTerminated();
+        return delegate.read();
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        verifyNotTerminated();
+        return delegate.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        verifyNotTerminated();
+        return delegate.read(b, off, len);
+    }
+
+    @Override
+    public int available() throws IOException {
+        verifyNotTerminated();
+        return delegate.available();
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        verifyNotTerminated();
+        return delegate.skip(n);
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        verifyNotTerminated();
+        delegate.reset();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        verifyNotTerminated();
+        delegate.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        verifyNotTerminated();
+        return delegate.markSupported();
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            delegate.close();
+        } catch (final Exception e) {
+            if (taskTermination.isTerminated()) {
+                final TerminatedTaskException tte = new 
TerminatedTaskException();
+                tte.addSuppressed(e);
+
+                if (terminatedCallback != null) {
+                    try {
+                        terminatedCallback.run();
+                    } catch (final Exception callbackException) {
+                        tte.addSuppressed(callbackException);
+                    }
+                }
+
+                throw tte;
+            }
+        }
+
+        verifyNotTerminated();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationOutputStream.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationOutputStream.java
new file mode 100644
index 0000000..f76199d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationOutputStream.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.controller.lifecycle.TaskTermination;
+import org.apache.nifi.processor.exception.TerminatedTaskException;
+
+public class TaskTerminationOutputStream extends OutputStream {
+    private final TaskTermination taskTermination;
+    private final OutputStream delegate;
+    private final Runnable terminatedCallback;
+
+    public TaskTerminationOutputStream(final OutputStream delegate, final 
TaskTermination taskTermination, final Runnable terminatedCallback) {
+        this.delegate = delegate;
+        this.taskTermination = taskTermination;
+        this.terminatedCallback = terminatedCallback;
+    }
+
+    private void verifyNotTerminated() {
+        if (taskTermination.isTerminated()) {
+            final TerminatedTaskException tte = new TerminatedTaskException();
+
+            if (terminatedCallback != null) {
+                try {
+                    terminatedCallback.run();
+                } catch (final Exception e) {
+                    tte.addSuppressed(e);
+                }
+            }
+
+            throw tte;
+        }
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        verifyNotTerminated();
+        delegate.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        verifyNotTerminated();
+        delegate.write(b, off, len);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        verifyNotTerminated();
+        delegate.write(b);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        verifyNotTerminated();
+        delegate.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            delegate.close();
+        } catch (final Exception e) {
+            if (taskTermination.isTerminated()) {
+                final TerminatedTaskException tte = new 
TerminatedTaskException();
+                tte.addSuppressed(e);
+
+                if (terminatedCallback != null) {
+                    try {
+                        terminatedCallback.run();
+                    } catch (final Exception callbackException) {
+                        tte.addSuppressed(callbackException);
+                    }
+                }
+
+                throw tte;
+            }
+        }
+
+        verifyNotTerminated();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
index 8f36e1e..d6380ec 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
@@ -22,10 +22,10 @@ import org.apache.nifi.engine.FlowEngine;
 
 /**
  * Base implementation of the {@link SchedulingAgent} which encapsulates the
- * updates to the {@link ScheduleState} based on invoked operation and then
+ * updates to the {@link LifecycleState} based on invoked operation and then
  * delegates to the corresponding 'do' methods. For example; By invoking
- * {@link #schedule(Connectable, ScheduleState)} the
- * {@link ScheduleState#setScheduled(boolean)} with value 'true' will be
+ * {@link #schedule(Connectable, LifecycleState)} the
+ * {@link LifecycleState#setScheduled(boolean)} with value 'true' will be
  * invoked.
  *
  * @see EventDrivenSchedulingAgent
@@ -41,70 +41,70 @@ abstract class AbstractSchedulingAgent implements 
SchedulingAgent {
     }
 
     @Override
-    public void schedule(Connectable connectable, ScheduleState scheduleState) 
{
+    public void schedule(Connectable connectable, LifecycleState 
scheduleState) {
         scheduleState.setScheduled(true);
         this.doSchedule(connectable, scheduleState);
     }
 
     @Override
-    public void unschedule(Connectable connectable, ScheduleState 
scheduleState) {
+    public void unschedule(Connectable connectable, LifecycleState 
scheduleState) {
         scheduleState.setScheduled(false);
         this.doUnschedule(connectable, scheduleState);
     }
 
     @Override
-    public void schedule(ReportingTaskNode taskNode, ScheduleState 
scheduleState) {
+    public void schedule(ReportingTaskNode taskNode, LifecycleState 
scheduleState) {
         scheduleState.setScheduled(true);
         this.doSchedule(taskNode, scheduleState);
     }
 
     @Override
-    public void unschedule(ReportingTaskNode taskNode, ScheduleState 
scheduleState) {
+    public void unschedule(ReportingTaskNode taskNode, LifecycleState 
scheduleState) {
         scheduleState.setScheduled(false);
         this.doUnschedule(taskNode, scheduleState);
     }
 
     /**
-     * Schedules the provided {@link Connectable}. Its {@link ScheduleState}
+     * Schedules the provided {@link Connectable}. Its {@link LifecycleState}
      * will be set to <i>true</i>
      *
      * @param connectable
      *            the instance of {@link Connectable}
      * @param scheduleState
-     *            the instance of {@link ScheduleState}
+     *            the instance of {@link LifecycleState}
      */
-    protected abstract void doSchedule(Connectable connectable, ScheduleState 
scheduleState);
+    protected abstract void doSchedule(Connectable connectable, LifecycleState 
scheduleState);
 
     /**
-     * Unschedules the provided {@link Connectable}. Its {@link ScheduleState}
+     * Unschedules the provided {@link Connectable}. Its {@link LifecycleState}
      * will be set to <i>false</i>
      *
      * @param connectable
      *            the instance of {@link Connectable}
      * @param scheduleState
-     *            the instance of {@link ScheduleState}
+     *            the instance of {@link LifecycleState}
      */
-    protected abstract void doUnschedule(Connectable connectable, 
ScheduleState scheduleState);
+    protected abstract void doUnschedule(Connectable connectable, 
LifecycleState scheduleState);
 
     /**
      * Schedules the provided {@link ReportingTaskNode}. Its
-     * {@link ScheduleState} will be set to <i>true</i>
+     * {@link LifecycleState} will be set to <i>true</i>
      *
      * @param connectable
      *            the instance of {@link ReportingTaskNode}
      * @param scheduleState
-     *            the instance of {@link ScheduleState}
+     *            the instance of {@link LifecycleState}
      */
-    protected abstract void doSchedule(ReportingTaskNode connectable, 
ScheduleState scheduleState);
+    protected abstract void doSchedule(ReportingTaskNode connectable, 
LifecycleState scheduleState);
 
     /**
      * Unschedules the provided {@link ReportingTaskNode}. Its
-     * {@link ScheduleState} will be set to <i>false</i>
+     * {@link LifecycleState} will be set to <i>false</i>
      *
      * @param connectable
      *            the instance of {@link ReportingTaskNode}
      * @param scheduleState
-     *            the instance of {@link ScheduleState}
+     *            the instance of {@link LifecycleState}
      */
-    protected abstract void doUnschedule(ReportingTaskNode connectable, 
ScheduleState scheduleState);
+    protected abstract void doUnschedule(ReportingTaskNode connectable, 
LifecycleState scheduleState);
 }

Reply via email to