http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java deleted file mode 100644 index 4d0d850..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.ConnectableType; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.controller.ProcessorNode; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.provenance.ProvenanceEventRepository; -import org.apache.nifi.util.Connectables; - -/** - * - */ -public class ProcessContext { - - private final Connectable connectable; - private final ContentRepository contentRepo; - private final FlowFileRepository flowFileRepo; - private final FlowFileEventRepository flowFileEventRepo; - private final CounterRepository counterRepo; - private final ProvenanceEventRepository provenanceRepo; - private final AtomicLong connectionIndex; - - public ProcessContext(final Connectable connectable, final AtomicLong connectionIndex, final ContentRepository contentRepository, - final FlowFileRepository flowFileRepository, final FlowFileEventRepository flowFileEventRepository, - final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository) { - this.connectable = connectable; - contentRepo = contentRepository; - flowFileRepo = flowFileRepository; - flowFileEventRepo = flowFileEventRepository; - counterRepo = counterRepository; - provenanceRepo = provenanceRepository; - - this.connectionIndex = connectionIndex; - } - - Connectable getConnectable() { - return connectable; - } - - /** - * - * @param relationship relationship - * @return connections for relationship - */ - Collection<Connection> getConnections(final Relationship relationship) { - Collection<Connection> collection = connectable.getConnections(relationship); - if (collection == null) { - collection = new ArrayList<>(); - } - return Collections.unmodifiableCollection(collection); - } - - /** - * @return an unmodifiable list containing a copy of all incoming connections for the processor from which FlowFiles are allowed to be pulled - */ - List<Connection> getPollableConnections() { - if (pollFromSelfLoopsOnly()) { - final List<Connection> selfLoops = new ArrayList<>(); - for (final Connection connection : connectable.getIncomingConnections()) { - if (connection.getSource() == connection.getDestination()) { - selfLoops.add(connection); - } - } - - return selfLoops; - } else { - return connectable.getIncomingConnections(); - } - } - - private boolean isTriggerWhenAnyDestinationAvailable() { - if (connectable.getConnectableType() != ConnectableType.PROCESSOR) { - return false; - } - - final ProcessorNode procNode = (ProcessorNode) connectable; - return procNode.isTriggerWhenAnyDestinationAvailable(); - } - - /** - * @return true if we are allowed to take FlowFiles only from self-loops. This is the case when no Relationships are available except for self-looping Connections - */ - private boolean pollFromSelfLoopsOnly() { - if (isTriggerWhenAnyDestinationAvailable()) { - // we can pull from any incoming connection, as long as at least one downstream connection - // is available for each relationship. - // I.e., we can poll only from self if no relationships are available - return !Connectables.anyRelationshipAvailable(connectable); - } else { - for (final Connection connection : connectable.getConnections()) { - // A downstream connection is full. We are only allowed to pull from self-loops. - if (connection.getFlowFileQueue().isFull()) { - return true; - } - } - } - - return false; - } - - void adjustCounter(final String name, final long delta) { - final String localContext = connectable.getName() + " (" + connectable.getIdentifier() + ")"; - final String globalContext = "All " + connectable.getComponentType() + "'s"; - - counterRepo.adjustCounter(localContext, name, delta); - counterRepo.adjustCounter(globalContext, name, delta); - } - - ContentRepository getContentRepository() { - return contentRepo; - } - - FlowFileRepository getFlowFileRepository() { - return flowFileRepo; - } - - public FlowFileEventRepository getFlowFileEventRepository() { - return flowFileEventRepo; - } - - ProvenanceEventRepository getProvenanceRepository() { - return provenanceRepo; - } - - long getNextFlowFileSequence() { - return flowFileRepo.getNextFlowFileSequence(); - } - - int getNextIncomingConnectionIndex() { - final int numIncomingConnections = connectable.getIncomingConnections().size(); - return (int) (connectionIndex.getAndIncrement() % Math.max(1, numIncomingConnections)); - } - - public boolean isAnyRelationshipAvailable() { - for (final Relationship relationship : getConnectable().getRelationships()) { - final Collection<Connection> connections = getConnections(relationship); - - boolean available = true; - for (final Connection connection : connections) { - if (connection.getFlowFileQueue().isFull()) { - available = false; - break; - } - } - - if (available) { - return true; - } - } - - return false; - } - - public int getAvailableRelationshipCount() { - int count = 0; - for (final Relationship relationship : connectable.getRelationships()) { - final Collection<Connection> connections = connectable.getConnections(relationship); - if (connections == null || connections.isEmpty()) { - count++; - } else { - boolean available = true; - for (final Connection connection : connections) { - // consider self-loops available - if (connection.getSource() == connection.getDestination()) { - continue; - } - - if (connection.getFlowFileQueue().isFull()) { - available = false; - break; - } - } - - if (available) { - count++; - } - } - } - - return count; - } - - /** - * A Relationship is said to be Available if and only if all Connections for that Relationship are either self-loops or have non-full queues. - * - * @param requiredNumber minimum number of relationships that must have availability - * @return Checks if at least <code>requiredNumber</code> of Relationationships are "available." If so, returns <code>true</code>, otherwise returns <code>false</code> - */ - public boolean isRelationshipAvailabilitySatisfied(final int requiredNumber) { - int unavailable = 0; - - final Collection<Relationship> allRelationships = connectable.getRelationships(); - final int numRelationships = allRelationships.size(); - - // the maximum number of Relationships that can be unavailable and still return true. - final int maxUnavailable = numRelationships - requiredNumber; - - for (final Relationship relationship : allRelationships) { - final Collection<Connection> connections = connectable.getConnections(relationship); - if (connections != null && !connections.isEmpty()) { - boolean available = true; - for (final Connection connection : connections) { - // consider self-loops available - if (connection.getSource() == connection.getDestination()) { - continue; - } - - if (connection.getFlowFileQueue().isFull()) { - available = false; - break; - } - } - - if (!available) { - unavailable++; - if (unavailable > maxUnavailable) { - return false; - } - } - } - } - - return true; - } - - public Set<Relationship> getAvailableRelationships() { - final Set<Relationship> set = new HashSet<>(); - for (final Relationship relationship : getConnectable().getRelationships()) { - final Collection<Connection> connections = getConnections(relationship); - if (connections.isEmpty()) { - set.add(relationship); - } else { - boolean available = true; - for (final Connection connection : connections) { - if (connection.getFlowFileQueue().isFull()) { - available = false; - } - } - - if (available) { - set.add(relationship); - } - } - } - - return set; - } -}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java new file mode 100644 index 0000000..a407371 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.util.Connectables; + +/** + * + */ +public class RepositoryContext { + + private final Connectable connectable; + private final ContentRepository contentRepo; + private final FlowFileRepository flowFileRepo; + private final FlowFileEventRepository flowFileEventRepo; + private final CounterRepository counterRepo; + private final ProvenanceEventRepository provenanceRepo; + private final AtomicLong connectionIndex; + + public RepositoryContext(final Connectable connectable, final AtomicLong connectionIndex, final ContentRepository contentRepository, + final FlowFileRepository flowFileRepository, final FlowFileEventRepository flowFileEventRepository, + final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository) { + this.connectable = connectable; + contentRepo = contentRepository; + flowFileRepo = flowFileRepository; + flowFileEventRepo = flowFileEventRepository; + counterRepo = counterRepository; + provenanceRepo = provenanceRepository; + + this.connectionIndex = connectionIndex; + } + + Connectable getConnectable() { + return connectable; + } + + /** + * + * @param relationship relationship + * @return connections for relationship + */ + Collection<Connection> getConnections(final Relationship relationship) { + Collection<Connection> collection = connectable.getConnections(relationship); + if (collection == null) { + collection = new ArrayList<>(); + } + return Collections.unmodifiableCollection(collection); + } + + /** + * @return an unmodifiable list containing a copy of all incoming connections for the processor from which FlowFiles are allowed to be pulled + */ + List<Connection> getPollableConnections() { + if (pollFromSelfLoopsOnly()) { + final List<Connection> selfLoops = new ArrayList<>(); + for (final Connection connection : connectable.getIncomingConnections()) { + if (connection.getSource() == connection.getDestination()) { + selfLoops.add(connection); + } + } + + return selfLoops; + } else { + return connectable.getIncomingConnections(); + } + } + + private boolean isTriggerWhenAnyDestinationAvailable() { + if (connectable.getConnectableType() != ConnectableType.PROCESSOR) { + return false; + } + + final ProcessorNode procNode = (ProcessorNode) connectable; + return procNode.isTriggerWhenAnyDestinationAvailable(); + } + + /** + * @return true if we are allowed to take FlowFiles only from self-loops. This is the case when no Relationships are available except for self-looping Connections + */ + private boolean pollFromSelfLoopsOnly() { + if (isTriggerWhenAnyDestinationAvailable()) { + // we can pull from any incoming connection, as long as at least one downstream connection + // is available for each relationship. + // I.e., we can poll only from self if no relationships are available + return !Connectables.anyRelationshipAvailable(connectable); + } else { + for (final Connection connection : connectable.getConnections()) { + // A downstream connection is full. We are only allowed to pull from self-loops. + if (connection.getFlowFileQueue().isFull()) { + return true; + } + } + } + + return false; + } + + void adjustCounter(final String name, final long delta) { + final String localContext = connectable.getName() + " (" + connectable.getIdentifier() + ")"; + final String globalContext = "All " + connectable.getComponentType() + "'s"; + + counterRepo.adjustCounter(localContext, name, delta); + counterRepo.adjustCounter(globalContext, name, delta); + } + + ContentRepository getContentRepository() { + return contentRepo; + } + + FlowFileRepository getFlowFileRepository() { + return flowFileRepo; + } + + public FlowFileEventRepository getFlowFileEventRepository() { + return flowFileEventRepo; + } + + ProvenanceEventRepository getProvenanceRepository() { + return provenanceRepo; + } + + long getNextFlowFileSequence() { + return flowFileRepo.getNextFlowFileSequence(); + } + + int getNextIncomingConnectionIndex() { + final int numIncomingConnections = connectable.getIncomingConnections().size(); + return (int) (connectionIndex.getAndIncrement() % Math.max(1, numIncomingConnections)); + } + + public boolean isAnyRelationshipAvailable() { + for (final Relationship relationship : getConnectable().getRelationships()) { + final Collection<Connection> connections = getConnections(relationship); + + boolean available = true; + for (final Connection connection : connections) { + if (connection.getFlowFileQueue().isFull()) { + available = false; + break; + } + } + + if (available) { + return true; + } + } + + return false; + } + + public int getAvailableRelationshipCount() { + int count = 0; + for (final Relationship relationship : connectable.getRelationships()) { + final Collection<Connection> connections = connectable.getConnections(relationship); + if (connections == null || connections.isEmpty()) { + count++; + } else { + boolean available = true; + for (final Connection connection : connections) { + // consider self-loops available + if (connection.getSource() == connection.getDestination()) { + continue; + } + + if (connection.getFlowFileQueue().isFull()) { + available = false; + break; + } + } + + if (available) { + count++; + } + } + } + + return count; + } + + /** + * A Relationship is said to be Available if and only if all Connections for that Relationship are either self-loops or have non-full queues. + * + * @param requiredNumber minimum number of relationships that must have availability + * @return Checks if at least <code>requiredNumber</code> of Relationationships are "available." If so, returns <code>true</code>, otherwise returns <code>false</code> + */ + public boolean isRelationshipAvailabilitySatisfied(final int requiredNumber) { + int unavailable = 0; + + final Collection<Relationship> allRelationships = connectable.getRelationships(); + final int numRelationships = allRelationships.size(); + + // the maximum number of Relationships that can be unavailable and still return true. + final int maxUnavailable = numRelationships - requiredNumber; + + for (final Relationship relationship : allRelationships) { + final Collection<Connection> connections = connectable.getConnections(relationship); + if (connections != null && !connections.isEmpty()) { + boolean available = true; + for (final Connection connection : connections) { + // consider self-loops available + if (connection.getSource() == connection.getDestination()) { + continue; + } + + if (connection.getFlowFileQueue().isFull()) { + available = false; + break; + } + } + + if (!available) { + unavailable++; + if (unavailable > maxUnavailable) { + return false; + } + } + } + } + + return true; + } + + public Set<Relationship> getAvailableRelationships() { + final Set<Relationship> set = new HashSet<>(); + for (final Relationship relationship : getConnectable().getRelationships()) { + final Collection<Connection> connections = getConnections(relationship); + if (connections.isEmpty()) { + set.add(relationship); + } else { + boolean available = true; + for (final Connection connection : connections) { + if (connection.getFlowFileQueue().isFull()) { + available = false; + } + } + + if (available) { + set.add(relationship); + } + } + } + + return set; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 24fbf72..eed4dbe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -16,10 +16,38 @@ */ package org.apache.nifi.controller.repository; -import org.apache.commons.io.IOUtils; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.lifecycle.TaskTermination; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.claim.ContentClaim; @@ -30,6 +58,8 @@ import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream; import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream; import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream; import org.apache.nifi.controller.repository.io.LimitedInputStream; +import org.apache.nifi.controller.repository.io.TaskTerminationInputStream; +import org.apache.nifi.controller.repository.io.TaskTerminationOutputStream; import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -40,6 +70,7 @@ import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.FlowFileHandlingException; import org.apache.nifi.processor.exception.MissingFlowFileException; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.exception.TerminatedTaskException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.StreamCallback; @@ -55,33 +86,6 @@ import org.apache.nifi.stream.io.StreamUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.Closeable; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - /** * <p> * Provides a ProcessSession that ensures all accesses, changes and transfers @@ -106,11 +110,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private static final Logger claimLog = LoggerFactory.getLogger(StandardProcessSession.class.getSimpleName() + ".claims"); private static final int MAX_ROLLBACK_FLOWFILES_TO_LOG = 5; - private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>(); - private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<>(); - private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>(); - private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>(); - private final ProcessContext context; + private final Map<FlowFileRecord, StandardRepositoryRecord> records = new ConcurrentHashMap<>(); + private final Map<String, StandardFlowFileEvent> connectionCounts = new ConcurrentHashMap<>(); + private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new ConcurrentHashMap<>(); + private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new ConcurrentHashMap<>(); + private final RepositoryContext context; + private final TaskTermination taskTermination; private final Map<FlowFile, Integer> readRecursionSet = new HashMap<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring private final Set<FlowFile> writeRecursionSet = new HashSet<>(); private final Map<FlowFile, Path> deleteOnCommit = new HashMap<>(); @@ -137,9 +142,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private long processingStartTime; // List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed - private final Map<FlowFile, InputStream> openInputStreams = new HashMap<>(); + private final Map<FlowFile, InputStream> openInputStreams = new ConcurrentHashMap<>(); // List of OutputStreams that have been opened by calls to {@link #write(FlowFile)} and not yet closed - private final Map<FlowFile, OutputStream> openOutputStreams = new HashMap<>(); + private final Map<FlowFile, OutputStream> openOutputStreams = new ConcurrentHashMap<>(); // maps a FlowFile to all Provenance Events that were generated for that FlowFile. // we do this so that if we generate a Fork event, for example, and then remove the event in the same @@ -153,8 +158,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private Checkpoint checkpoint = new Checkpoint(); private final ContentClaimWriteCache claimCache; - public StandardProcessSession(final ProcessContext context) { + public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination) { this.context = context; + this.taskTermination = taskTermination; final Connectable connectable = context.getConnectable(); final String componentType; @@ -194,6 +200,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE processingStartTime = System.nanoTime(); } + private void verifyTaskActive() { + if (taskTermination.isTerminated()) { + rollback(false, true); + throw new TerminatedTaskException(); + } + } + private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap, final String action, final String streamType) { final Map<FlowFile, ? extends Closeable> openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List for (final Map.Entry<FlowFile, ? extends Closeable> entry : openStreamCopy.entrySet()) { @@ -212,7 +225,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } public void checkpoint() { - + verifyTaskActive(); resetWriteClaims(false); closeStreams(openInputStreams, "committed", "input"); @@ -318,6 +331,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void commit() { + verifyTaskActive(); checkpoint(); commit(this.checkpoint); this.checkpoint = null; @@ -783,6 +797,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile) { + verifyTaskActive(); + final StandardRepositoryRecord repoRecord = records.get(flowFile); if (repoRecord == null) { throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")"); @@ -927,6 +943,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void rollback(final boolean penalize) { rollback(penalize, false); + verifyTaskActive(); } private void rollback(final boolean penalize, final boolean rollbackCheckpoint) { @@ -1153,6 +1170,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void migrate(final ProcessSession newOwner, final Collection<FlowFile> flowFiles) { + verifyTaskActive(); + if (Objects.requireNonNull(newOwner) == this) { throw new IllegalArgumentException("Cannot migrate FlowFiles from a Process Session to itself"); } @@ -1469,6 +1488,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void adjustCounter(final String name, final long delta, final boolean immediate) { + verifyTaskActive(); + final Map<String, Long> counters; if (immediate) { if (immediateCounters == null) { @@ -1501,6 +1522,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile get() { + verifyTaskActive(); final List<Connection> connections = context.getPollableConnections(); final int numConnections = connections.size(); for (int numAttempts = 0; numAttempts < numConnections; numAttempts++) { @@ -1520,6 +1542,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public List<FlowFile> get(final int maxResults) { + verifyTaskActive(); + if (maxResults < 0) { throw new IllegalArgumentException(); } @@ -1554,6 +1578,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public List<FlowFile> get(final FlowFileFilter filter) { + verifyTaskActive(); + return get(new ConnectionPoller() { @Override public List<FlowFileRecord> poll(final Connection connection, final Set<FlowFileRecord> expiredRecords) { @@ -1562,31 +1588,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE }, true); } - private List<FlowFile> get(final Connection connection, final ConnectionPoller poller, final boolean lockQueue) { - if (lockQueue) { - connection.lock(); - } - - try { - final Set<FlowFileRecord> expired = new HashSet<>(); - final List<FlowFileRecord> newlySelected = poller.poll(connection, expired); - removeExpired(expired, connection); - - if (newlySelected.isEmpty() && expired.isEmpty()) { - return new ArrayList<>(); - } - - for (final FlowFileRecord flowFile : newlySelected) { - registerDequeuedRecord(flowFile, connection); - } - - return new ArrayList<>(newlySelected); - } finally { - if (lockQueue) { - connection.unlock(); - } - } - } private List<FlowFile> get(final ConnectionPoller poller, final boolean lockAllQueues) { final List<Connection> connections = context.getPollableConnections(); @@ -1630,6 +1631,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public QueueSize getQueueSize() { + verifyTaskActive(); + int flowFileCount = 0; long byteCount = 0L; for (final Connection conn : context.getPollableConnections()) { @@ -1642,6 +1645,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile create() { + verifyTaskActive(); + final Map<String, String> attrs = new HashMap<>(); attrs.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime())); attrs.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); @@ -1659,12 +1664,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile clone(FlowFile example) { + verifyTaskActive(); example = validateRecordState(example); return clone(example, 0L, example.getSize()); } @Override public FlowFile clone(FlowFile example, final long offset, final long size) { + verifyTaskActive(); + example = validateRecordState(example); final StandardRepositoryRecord exampleRepoRecord = records.get(example); final FlowFileRecord currRec = exampleRepoRecord.getCurrent(); @@ -1734,6 +1742,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile penalize(FlowFile flowFile) { + verifyTaskActive(); + flowFile = validateRecordState(flowFile); final StandardRepositoryRecord record = records.get(flowFile); final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS); @@ -1744,6 +1754,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile putAttribute(FlowFile flowFile, final String key, final String value) { + verifyTaskActive(); flowFile = validateRecordState(flowFile); if (CoreAttributes.UUID.key().equals(key)) { @@ -1759,6 +1770,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile putAllAttributes(FlowFile flowFile, final Map<String, String> attributes) { + verifyTaskActive(); + flowFile = validateRecordState(flowFile); final StandardRepositoryRecord record = records.get(flowFile); @@ -1779,6 +1792,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile removeAttribute(FlowFile flowFile, final String key) { + verifyTaskActive(); flowFile = validateRecordState(flowFile); if (CoreAttributes.UUID.key().equals(key)) { @@ -1793,6 +1807,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile removeAllAttributes(FlowFile flowFile, final Set<String> keys) { + verifyTaskActive(); flowFile = validateRecordState(flowFile); if (keys == null) { @@ -1817,6 +1832,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile removeAllAttributes(FlowFile flowFile, final Pattern keyPattern) { + verifyTaskActive(); + flowFile = validateRecordState(flowFile); final StandardRepositoryRecord record = records.get(flowFile); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build(); @@ -1851,6 +1868,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void transfer(FlowFile flowFile, final Relationship relationship) { + verifyTaskActive(); flowFile = validateRecordState(flowFile); final int numDestinations = context.getConnections(relationship).size(); final int multiplier = Math.max(1, numDestinations); @@ -1881,6 +1899,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void transfer(FlowFile flowFile) { + verifyTaskActive(); + flowFile = validateRecordState(flowFile); final StandardRepositoryRecord record = records.get(flowFile); if (record.getOriginalQueue() == null) { @@ -1899,6 +1919,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void transfer(Collection<FlowFile> flowFiles, final Relationship relationship) { + verifyTaskActive(); flowFiles = validateRecordState(flowFiles); boolean autoTerminated = false; @@ -1936,6 +1957,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void remove(FlowFile flowFile) { + verifyTaskActive(); + flowFile = validateRecordState(flowFile); final StandardRepositoryRecord record = records.get(flowFile); record.markForDelete(); @@ -1957,6 +1980,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void remove(Collection<FlowFile> flowFiles) { + verifyTaskActive(); + flowFiles = validateRecordState(flowFiles); for (final FlowFile flowFile : flowFiles) { final StandardRepositoryRecord record = records.get(flowFile); @@ -2126,7 +2151,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { StreamUtils.skip(rawInStream, offset); } catch(IOException ioe) { - IOUtils.closeQuietly(rawInStream); + try { + rawInStream.close(); + } catch (final Exception e) { + ioe.addSuppressed(ioe); + } + throw ioe; } return rawInStream; @@ -2147,6 +2177,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) { + verifyTaskActive(); + source = validateRecordState(source, true); final StandardRepositoryRecord record = records.get(source); @@ -2172,7 +2204,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { incrementReadCount(source); - reader.process(ffais); + reader.process(createTaskTerminationStream(ffais)); // Allow processors to close the file after reading to avoid too many files open or do smart session stream management. if (this.currentReadClaimStream != null && !allowSessionStreamManagement) { @@ -2201,6 +2233,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public InputStream read(FlowFile source) { + verifyTaskActive(); + source = validateRecordState(source, true); final StandardRepositoryRecord record = records.get(source); @@ -2302,7 +2336,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE incrementReadCount(sourceFlowFile); openInputStreams.put(sourceFlowFile, errorHandlingStream); - return errorHandlingStream; + + return createTaskTerminationStream(errorHandlingStream); + } + + private InputStream createTaskTerminationStream(final InputStream delegate) { + return new TaskTerminationInputStream(delegate, taskTermination, () -> rollback(false, true)); + } + + private OutputStream createTaskTerminationStream(final OutputStream delegate) { + return new TaskTerminationOutputStream(delegate, taskTermination, () -> rollback(false, true)); } private void incrementReadCount(final FlowFile flowFile) { @@ -2325,11 +2368,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) { + verifyTaskActive(); + return merge(sources, destination, null, null, null); } @Override public FlowFile merge(Collection<FlowFile> sources, FlowFile destination, final byte[] header, final byte[] footer, final byte[] demarcator) { + verifyTaskActive(); + sources = validateRecordState(sources); destination = validateRecordState(destination); if (sources.contains(destination)) { @@ -2431,6 +2478,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public OutputStream write(FlowFile source) { + verifyTaskActive(); source = validateRecordState(source); final StandardRepositoryRecord record = records.get(source); @@ -2530,7 +2578,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE writeRecursionSet.add(source); openOutputStreams.put(source, errorHandlingOutputStream); - return errorHandlingOutputStream; + return createTaskTerminationStream(errorHandlingOutputStream); } catch (final ContentNotFoundException nfe) { resetWriteClaims(); // need to reset write claim before we can remove the claim destroyContent(newClaim); @@ -2553,6 +2601,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile write(FlowFile source, final OutputStreamCallback writer) { + verifyTaskActive(); source = validateRecordState(source); final StandardRepositoryRecord record = records.get(source); @@ -2568,7 +2617,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose)) { try { writeRecursionSet.add(source); - writer.process(new FlowFileAccessOutputStream(countingOut, source)); + final OutputStream ffaos = new FlowFileAccessOutputStream(countingOut, source); + writer.process(createTaskTerminationStream(ffaos)); } finally { writtenToFlowFile = countingOut.getBytesWritten(); bytesWritten += countingOut.getBytesWritten(); @@ -2609,6 +2659,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile append(FlowFile source, final OutputStreamCallback writer) { + verifyTaskActive(); + source = validateRecordState(source); final StandardRepositoryRecord record = records.get(source); long newSize = 0L; @@ -2616,7 +2668,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // Get the current Content Claim from the record and see if we already have // an OutputStream that we can append to. final ContentClaim oldClaim = record.getCurrentClaim(); - ByteCountingOutputStream outStream = appendableStreams.get(oldClaim); + ByteCountingOutputStream outStream = oldClaim == null ? null : appendableStreams.get(oldClaim); long originalByteWrittenCount = 0; ContentClaim newClaim = null; @@ -2789,6 +2841,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile write(FlowFile source, final StreamCallback writer) { + verifyTaskActive(); source = validateRecordState(source); final StandardRepositoryRecord record = records.get(source); final ContentClaim currClaim = record.getCurrentClaim(); @@ -2821,10 +2874,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it. final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim); + final FlowFileAccessOutputStream ffaos = new FlowFileAccessOutputStream(countingOut, source); boolean cnfeThrown = false; try { - writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source)); + writer.process(createTaskTerminationStream(ffais), createTaskTerminationStream(ffaos)); } catch (final ContentNotFoundException cnfe) { cnfeThrown = true; throw cnfe; @@ -2868,6 +2922,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile importFrom(final Path source, final boolean keepSourceFile, FlowFile destination) { + verifyTaskActive(); + destination = validateRecordState(destination); // TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true. if (!keepSourceFile && !Files.isWritable(source.getParent()) && !source.getParent().toFile().canWrite()) { @@ -2918,6 +2974,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile importFrom(final InputStream source, FlowFile destination) { + verifyTaskActive(); + destination = validateRecordState(destination); final StandardRepositoryRecord record = records.get(destination); ContentClaim newClaim = null; @@ -2929,7 +2987,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination); - newSize = context.getContentRepository().importFrom(source, newClaim); + newSize = context.getContentRepository().importFrom(createTaskTerminationStream(source), newClaim); bytesWritten += newSize; } catch (final IOException e) { throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e); @@ -2955,6 +3013,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void exportTo(FlowFile source, final Path destination, final boolean append) { + verifyTaskActive(); source = validateRecordState(source); final StandardRepositoryRecord record = records.get(source); try { @@ -2973,6 +3032,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void exportTo(FlowFile source, final OutputStream destination) { + verifyTaskActive(); source = validateRecordState(source); final StandardRepositoryRecord record = records.get(source); @@ -2997,25 +3057,24 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it. - final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim()); - boolean cnfeThrown = false; + try (final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim())) { + boolean cnfeThrown = false; - try { - incrementReadCount(source); - StreamUtils.copy(ffais, destination, source.getSize()); - } catch (final ContentNotFoundException cnfe) { - cnfeThrown = true; - throw cnfe; - } finally { - decrementReadCount(source); + try { + incrementReadCount(source); + StreamUtils.copy(ffais, createTaskTerminationStream(destination), source.getSize()); + } catch (final ContentNotFoundException cnfe) { + cnfeThrown = true; + throw cnfe; + } finally { + decrementReadCount(source); - IOUtils.closeQuietly(ffais); - // if cnfeThrown is true, we don't need to re-throw the Exception; it will propagate. - if (!cnfeThrown && ffais.getContentNotFoundException() != null) { - throw ffais.getContentNotFoundException(); + // if cnfeThrown is true, we don't need to re-throw the Exception; it will propagate. + if (!cnfeThrown && ffais.getContentNotFoundException() != null) { + throw ffais.getContentNotFoundException(); + } } } - } catch (final ContentNotFoundException nfe) { handleContentNotFound(nfe, record); } catch (final IOException ex) { @@ -3106,6 +3165,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile create(FlowFile parent) { + verifyTaskActive(); parent = getMostRecent(parent); final Map<String, String> newAttributes = new HashMap<>(3); @@ -3144,6 +3204,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile create(Collection<FlowFile> parents) { + verifyTaskActive(); + parents = parents.stream().map(this::getMostRecent).collect(Collectors.toList()); final Map<String, String> newAttributes = intersectAttributes(parents); @@ -3229,12 +3291,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override protected void finalize() throws Throwable { - rollback(); + rollback(false, false); super.finalize(); } @Override public ProvenanceReporter getProvenanceReporter() { + verifyTaskActive(); return provenanceReporter; } @@ -3261,9 +3324,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final List<ProvenanceEventRecord> autoTerminatedEvents = new ArrayList<>(); private final Set<ProvenanceEventRecord> reportedEvents = new LinkedHashSet<>(); - private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>(); - private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<>(); - private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>(); + private final Map<FlowFileRecord, StandardRepositoryRecord> records = new ConcurrentHashMap<>(); + private final Map<String, StandardFlowFileEvent> connectionCounts = new ConcurrentHashMap<>(); + private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new ConcurrentHashMap<>(); private Map<String, Long> countersOnCommit = new HashMap<>(); private Map<String, Long> immediateCounters = new HashMap<>(); @@ -3294,18 +3357,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles); if (session.countersOnCommit != null) { - if (this.countersOnCommit == null) { - this.countersOnCommit = new HashMap<>(); - } - this.countersOnCommit.putAll(session.countersOnCommit); } if (session.immediateCounters != null) { - if (this.immediateCounters == null) { - this.immediateCounters = new HashMap<>(); - } - this.immediateCounters.putAll(session.immediateCounters); } http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java index 76cecbb..7e4d5b0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java @@ -16,18 +16,21 @@ */ package org.apache.nifi.controller.repository; +import org.apache.nifi.controller.lifecycle.TaskTermination; import org.apache.nifi.processor.ProcessSessionFactory; public class StandardProcessSessionFactory implements ProcessSessionFactory { - private final ProcessContext context; + private final RepositoryContext context; + private final TaskTermination taskTermination; - public StandardProcessSessionFactory(final ProcessContext context) { + public StandardProcessSessionFactory(final RepositoryContext context, final TaskTermination taskTermination) { this.context = context; + this.taskTermination = taskTermination; } @Override public StandardProcessSession createSession() { - return new StandardProcessSession(context); + return new StandardProcessSession(context, taskTermination); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WeakHashMapProcessSessionFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WeakHashMapProcessSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WeakHashMapProcessSessionFactory.java new file mode 100644 index 0000000..cda65bb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WeakHashMapProcessSessionFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.util.Map; +import java.util.WeakHashMap; + +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.TerminatedTaskException; + +public class WeakHashMapProcessSessionFactory implements ActiveProcessSessionFactory { + private final ProcessSessionFactory delegate; + private final Map<ProcessSession, Boolean> sessionMap = new WeakHashMap<>(); + private boolean terminated = false; + + public WeakHashMapProcessSessionFactory(final ProcessSessionFactory delegate) { + this.delegate = delegate; + } + + @Override + public synchronized ProcessSession createSession() { + if (terminated) { + throw new TerminatedTaskException(); + } + + final ProcessSession session = delegate.createSession(); + sessionMap.put(session, Boolean.TRUE); + return session; + } + + @Override + public synchronized void terminateActiveSessions() { + terminated = true; + for (final ProcessSession session : sessionMap.keySet()) { + try { + session.rollback(); + } catch (final TerminatedTaskException tte) { + // ignore + } + } + + sessionMap.clear(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java index 6b608ac..bf3a870 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java @@ -20,16 +20,16 @@ package org.apache.nifi.controller.repository.claim; import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import org.apache.nifi.controller.repository.ContentRepository; public class ContentClaimWriteCache { private final ContentRepository contentRepo; - private final Map<ResourceClaim, OutputStream> streamMap = new HashMap<>(); + private final Map<ResourceClaim, OutputStream> streamMap = new ConcurrentHashMap<>(); private final Queue<ContentClaim> queue = new LinkedList<>(); private final int bufferSize; http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationInputStream.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationInputStream.java new file mode 100644 index 0000000..ffbe592 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationInputStream.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.io; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.nifi.controller.lifecycle.TaskTermination; +import org.apache.nifi.processor.exception.TerminatedTaskException; + +public class TaskTerminationInputStream extends InputStream { + private final TaskTermination taskTermination; + private final InputStream delegate; + private final Runnable terminatedCallback; + + public TaskTerminationInputStream(final InputStream delegate, final TaskTermination taskTermination, final Runnable terminatedCallback) { + this.delegate = delegate; + this.taskTermination = taskTermination; + this.terminatedCallback = terminatedCallback; + } + + private void verifyNotTerminated() { + if (taskTermination.isTerminated()) { + final TerminatedTaskException tte = new TerminatedTaskException(); + + if (terminatedCallback != null) { + try { + terminatedCallback.run(); + } catch (final Exception e) { + tte.addSuppressed(e); + } + } + + throw tte; + } + } + + @Override + public int read() throws IOException { + verifyNotTerminated(); + return delegate.read(); + } + + @Override + public int read(byte[] b) throws IOException { + verifyNotTerminated(); + return delegate.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + verifyNotTerminated(); + return delegate.read(b, off, len); + } + + @Override + public int available() throws IOException { + verifyNotTerminated(); + return delegate.available(); + } + + @Override + public long skip(long n) throws IOException { + verifyNotTerminated(); + return delegate.skip(n); + } + + @Override + public synchronized void reset() throws IOException { + verifyNotTerminated(); + delegate.reset(); + } + + @Override + public synchronized void mark(int readlimit) { + verifyNotTerminated(); + delegate.mark(readlimit); + } + + @Override + public boolean markSupported() { + verifyNotTerminated(); + return delegate.markSupported(); + } + + @Override + public void close() throws IOException { + try { + delegate.close(); + } catch (final Exception e) { + if (taskTermination.isTerminated()) { + final TerminatedTaskException tte = new TerminatedTaskException(); + tte.addSuppressed(e); + + if (terminatedCallback != null) { + try { + terminatedCallback.run(); + } catch (final Exception callbackException) { + tte.addSuppressed(callbackException); + } + } + + throw tte; + } + } + + verifyNotTerminated(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationOutputStream.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationOutputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationOutputStream.java new file mode 100644 index 0000000..f76199d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/TaskTerminationOutputStream.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.io; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.controller.lifecycle.TaskTermination; +import org.apache.nifi.processor.exception.TerminatedTaskException; + +public class TaskTerminationOutputStream extends OutputStream { + private final TaskTermination taskTermination; + private final OutputStream delegate; + private final Runnable terminatedCallback; + + public TaskTerminationOutputStream(final OutputStream delegate, final TaskTermination taskTermination, final Runnable terminatedCallback) { + this.delegate = delegate; + this.taskTermination = taskTermination; + this.terminatedCallback = terminatedCallback; + } + + private void verifyNotTerminated() { + if (taskTermination.isTerminated()) { + final TerminatedTaskException tte = new TerminatedTaskException(); + + if (terminatedCallback != null) { + try { + terminatedCallback.run(); + } catch (final Exception e) { + tte.addSuppressed(e); + } + } + + throw tte; + } + } + + @Override + public void write(final int b) throws IOException { + verifyNotTerminated(); + delegate.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + verifyNotTerminated(); + delegate.write(b, off, len); + } + + @Override + public void write(byte[] b) throws IOException { + verifyNotTerminated(); + delegate.write(b); + } + + @Override + public void flush() throws IOException { + verifyNotTerminated(); + delegate.flush(); + } + + @Override + public void close() throws IOException { + try { + delegate.close(); + } catch (final Exception e) { + if (taskTermination.isTerminated()) { + final TerminatedTaskException tte = new TerminatedTaskException(); + tte.addSuppressed(e); + + if (terminatedCallback != null) { + try { + terminatedCallback.run(); + } catch (final Exception callbackException) { + tte.addSuppressed(callbackException); + } + } + + throw tte; + } + } + + verifyNotTerminated(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java index 8f36e1e..d6380ec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java @@ -22,10 +22,10 @@ import org.apache.nifi.engine.FlowEngine; /** * Base implementation of the {@link SchedulingAgent} which encapsulates the - * updates to the {@link ScheduleState} based on invoked operation and then + * updates to the {@link LifecycleState} based on invoked operation and then * delegates to the corresponding 'do' methods. For example; By invoking - * {@link #schedule(Connectable, ScheduleState)} the - * {@link ScheduleState#setScheduled(boolean)} with value 'true' will be + * {@link #schedule(Connectable, LifecycleState)} the + * {@link LifecycleState#setScheduled(boolean)} with value 'true' will be * invoked. * * @see EventDrivenSchedulingAgent @@ -41,70 +41,70 @@ abstract class AbstractSchedulingAgent implements SchedulingAgent { } @Override - public void schedule(Connectable connectable, ScheduleState scheduleState) { + public void schedule(Connectable connectable, LifecycleState scheduleState) { scheduleState.setScheduled(true); this.doSchedule(connectable, scheduleState); } @Override - public void unschedule(Connectable connectable, ScheduleState scheduleState) { + public void unschedule(Connectable connectable, LifecycleState scheduleState) { scheduleState.setScheduled(false); this.doUnschedule(connectable, scheduleState); } @Override - public void schedule(ReportingTaskNode taskNode, ScheduleState scheduleState) { + public void schedule(ReportingTaskNode taskNode, LifecycleState scheduleState) { scheduleState.setScheduled(true); this.doSchedule(taskNode, scheduleState); } @Override - public void unschedule(ReportingTaskNode taskNode, ScheduleState scheduleState) { + public void unschedule(ReportingTaskNode taskNode, LifecycleState scheduleState) { scheduleState.setScheduled(false); this.doUnschedule(taskNode, scheduleState); } /** - * Schedules the provided {@link Connectable}. Its {@link ScheduleState} + * Schedules the provided {@link Connectable}. Its {@link LifecycleState} * will be set to <i>true</i> * * @param connectable * the instance of {@link Connectable} * @param scheduleState - * the instance of {@link ScheduleState} + * the instance of {@link LifecycleState} */ - protected abstract void doSchedule(Connectable connectable, ScheduleState scheduleState); + protected abstract void doSchedule(Connectable connectable, LifecycleState scheduleState); /** - * Unschedules the provided {@link Connectable}. Its {@link ScheduleState} + * Unschedules the provided {@link Connectable}. Its {@link LifecycleState} * will be set to <i>false</i> * * @param connectable * the instance of {@link Connectable} * @param scheduleState - * the instance of {@link ScheduleState} + * the instance of {@link LifecycleState} */ - protected abstract void doUnschedule(Connectable connectable, ScheduleState scheduleState); + protected abstract void doUnschedule(Connectable connectable, LifecycleState scheduleState); /** * Schedules the provided {@link ReportingTaskNode}. Its - * {@link ScheduleState} will be set to <i>true</i> + * {@link LifecycleState} will be set to <i>true</i> * * @param connectable * the instance of {@link ReportingTaskNode} * @param scheduleState - * the instance of {@link ScheduleState} + * the instance of {@link LifecycleState} */ - protected abstract void doSchedule(ReportingTaskNode connectable, ScheduleState scheduleState); + protected abstract void doSchedule(ReportingTaskNode connectable, LifecycleState scheduleState); /** * Unschedules the provided {@link ReportingTaskNode}. Its - * {@link ScheduleState} will be set to <i>false</i> + * {@link LifecycleState} will be set to <i>false</i> * * @param connectable * the instance of {@link ReportingTaskNode} * @param scheduleState - * the instance of {@link ScheduleState} + * the instance of {@link LifecycleState} */ - protected abstract void doUnschedule(ReportingTaskNode connectable, ScheduleState scheduleState); + protected abstract void doUnschedule(ReportingTaskNode connectable, LifecycleState scheduleState); }