This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 940fd8e NIFI-9847: Switched LifecycleState to use a WeakHashMap to
track Acti… (#5917)
940fd8e is described below
commit 940fd8e81c91f171f5f034550fbadb47e322229b
Author: markap14 <[email protected]>
AuthorDate: Wed Mar 30 15:25:14 2022 -0400
NIFI-9847: Switched LifecycleState to use a WeakHashMap to track Acti…
(#5917)
* NIFI-9847: Switched LifecycleState to use a WeakHashMap to track
ActiveProcessSessionFactory instances, instead of a regular Set that removed
the instance after calling onTrigger. This was necessary for processors such as
MergeRecord that may stash away an ActiveProcessSessionFactory for later use,
as we need to be able to force rollback on processor termination
* NIFI-9847: Fixed checkstyle violation
---
.../nifi/controller/StandardProcessorNode.java | 2 +-
.../repository/StandardProcessSession.java | 306 +++++++++++----------
.../nifi/controller/scheduling/LifecycleState.java | 30 +-
.../controller/queue/FlowFileQueueContents.java | 5 +
.../controller/queue/SwappablePriorityQueue.java | 19 +-
.../clustered/SocketLoadBalancedFlowFileQueue.java | 26 +-
.../partition/StandardRebalancingPartition.java | 20 +-
.../scheduling/EventDrivenSchedulingAgent.java | 8 +-
.../scheduling/StandardProcessScheduler.java | 4 +-
.../nifi/controller/tasks/ConnectableTask.java | 2 +-
.../controller/tasks/ReportingTaskWrapper.java | 2 +-
.../stateless/engine/StatelessSchedulingAgent.java | 2 +-
12 files changed, 246 insertions(+), 180 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 62d8297..f743ac5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1818,7 +1818,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
deactivateThread();
}
- scheduleState.decrementActiveThreadCount(null);
+ scheduleState.decrementActiveThreadCount();
hasActiveThreads = false;
scheduledState.set(ScheduledState.STOPPED);
future.complete(null);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3965b86..af79fab 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1329,11 +1329,11 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
if (repoRecord.getOriginalQueue() != null &&
repoRecord.getOriginalQueue().getIdentifier() != null) {
details.append("queue=")
.append(repoRecord.getOriginalQueue().getIdentifier())
- .append("/");
+ .append(", ");
}
details.append("filename=")
.append(repoRecord.getCurrent().getAttribute(CoreAttributes.FILENAME.key()))
- .append("/uuid=")
+ .append(", uuid=")
.append(repoRecord.getCurrent().getAttribute(CoreAttributes.UUID.key()));
}
if (records.size() > MAX_ROLLBACK_FLOWFILES_TO_LOG) {
@@ -1341,7 +1341,7 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
details.append(", ");
}
details.append(records.size() - MAX_ROLLBACK_FLOWFILES_TO_LOG)
- .append(" additional Flowfiles not listed");
+ .append(" additional FlowFiles not listed");
} else if (filesListed == 0) {
details.append("none");
}
@@ -1440,8 +1440,6 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
@Override
public void migrate(final ProcessSession newOwner, final
Collection<FlowFile> flowFiles) {
- verifyTaskActive();
-
if (Objects.requireNonNull(newOwner) == this) {
throw new IllegalArgumentException("Cannot migrate FlowFiles from
a Process Session to itself");
}
@@ -1457,188 +1455,200 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
migrate((StandardProcessSession) newOwner, flowFiles);
}
- private void migrate(final StandardProcessSession newOwner,
Collection<FlowFile> flowFiles) {
- // We don't call validateRecordState() here because we want to allow
migration of FlowFiles that have already been marked as removed or transferred,
etc.
- flowFiles =
flowFiles.stream().map(this::getMostRecent).collect(Collectors.toList());
+ private synchronized void migrate(final StandardProcessSession newOwner,
Collection<FlowFile> flowFiles) {
+ // This method will update many member variables/internal state of
both `this` and `newOwner`. These member variables may also be updated during
+ // session rollback, such as when a Processor is terminated. As such,
we need to ensure that we synchronize on both `this` and `newOwner` so that
+ // neither can be rolled back while we are in the process of migrating
FlowFiles from one session to another.
+ //
+ // We must also ensure that we verify that both sessions are in an
amenable state to perform this transference after obtaining the synchronization
lock.
+ // We synchronize on 'this' by marking the method synchronized.
Because the only way in which one Process Session will call into another is via
this migrate() method,
+ // we do not need to worry about the order in which the synchronized
lock is obtained.
+ synchronized (newOwner) {
+ verifyTaskActive();
+ newOwner.verifyTaskActive();
- for (final FlowFile flowFile : flowFiles) {
- if (openInputStreams.containsKey(flowFile)) {
- throw new IllegalStateException(flowFile + " cannot be
migrated to a new Process Session because this session currently "
- + "has an open InputStream for the FlowFile, created by
calling ProcessSession.read(FlowFile)");
- }
+ // We don't call validateRecordState() here because we want to
allow migration of FlowFiles that have already been marked as removed or
transferred, etc.
+ flowFiles =
flowFiles.stream().map(this::getMostRecent).collect(Collectors.toList());
- if (openOutputStreams.containsKey(flowFile)) {
- throw new IllegalStateException(flowFile + " cannot be
migrated to a new Process Session because this session currently "
- + "has an open OutputStream for the FlowFile, created by
calling ProcessSession.write(FlowFile)");
- }
+ for (final FlowFile flowFile : flowFiles) {
+ if (openInputStreams.containsKey(flowFile)) {
+ throw new IllegalStateException(flowFile + " cannot be
migrated to a new Process Session because this session currently "
+ + "has an open InputStream for the FlowFile, created
by calling ProcessSession.read(FlowFile)");
+ }
- if (readRecursionSet.containsKey(flowFile)) {
- throw new IllegalStateException(flowFile + " already in use
for an active callback or InputStream created by ProcessSession.read(FlowFile)
has not been closed");
- }
- if (writeRecursionSet.contains(flowFile)) {
- throw new IllegalStateException(flowFile + " already in use
for an active callback or OutputStream created by
ProcessSession.write(FlowFile) has not been closed");
- }
+ if (openOutputStreams.containsKey(flowFile)) {
+ throw new IllegalStateException(flowFile + " cannot be
migrated to a new Process Session because this session currently "
+ + "has an open OutputStream for the FlowFile, created
by calling ProcessSession.write(FlowFile)");
+ }
- final StandardRepositoryRecord record = getRecord(flowFile);
- if (record == null) {
- throw new FlowFileHandlingException(flowFile + " is not known
in this session (" + toString() + ")");
- }
- }
+ if (readRecursionSet.containsKey(flowFile)) {
+ throw new IllegalStateException(flowFile + " already in
use for an active callback or InputStream created by
ProcessSession.read(FlowFile) has not been closed");
+ }
+ if (writeRecursionSet.contains(flowFile)) {
+ throw new IllegalStateException(flowFile + " already in
use for an active callback or OutputStream created by
ProcessSession.write(FlowFile) has not been closed");
+ }
- // If we have a FORK event for one of the given FlowFiles, then all
children must also be migrated. Otherwise, we
- // could have a case where we have FlowFile A transferred and
eventually exiting the flow and later the 'newOwner'
- // ProcessSession is committed, claiming to have created FlowFiles
from the parent, which is no longer even in
- // the flow. This would be very confusing when looking at the
provenance for the FlowFile, so it is best to avoid this.
- final Set<String> flowFileIds = flowFiles.stream()
- .map(ff -> ff.getAttribute(CoreAttributes.UUID.key()))
- .collect(Collectors.toSet());
+ final StandardRepositoryRecord record = getRecord(flowFile);
+ if (record == null) {
+ throw new FlowFileHandlingException(flowFile + " is not
known in this session (" + toString() + ")");
+ }
+ }
- for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry :
forkEventBuilders.entrySet()) {
- final FlowFile eventFlowFile = entry.getKey();
- if (flowFiles.contains(eventFlowFile)) {
- final ProvenanceEventBuilder eventBuilder = entry.getValue();
- for (final String childId :
eventBuilder.getChildFlowFileIds()) {
- if (!flowFileIds.contains(childId)) {
- throw new FlowFileHandlingException("Cannot migrate "
+ eventFlowFile + " to a new session because it was forked to create " +
eventBuilder.getChildFlowFileIds().size()
- + " children and not all children are being
migrated. If any FlowFile is forked, all of its children must also be migrated
at the same time as the forked FlowFile");
+ // If we have a FORK event for one of the given FlowFiles, then
all children must also be migrated. Otherwise, we
+ // could have a case where we have FlowFile A transferred and
eventually exiting the flow and later the 'newOwner'
+ // ProcessSession is committed, claiming to have created FlowFiles
from the parent, which is no longer even in
+ // the flow. This would be very confusing when looking at the
provenance for the FlowFile, so it is best to avoid this.
+ final Set<String> flowFileIds = flowFiles.stream()
+ .map(ff -> ff.getAttribute(CoreAttributes.UUID.key()))
+ .collect(Collectors.toSet());
+
+ for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry :
forkEventBuilders.entrySet()) {
+ final FlowFile eventFlowFile = entry.getKey();
+ if (flowFiles.contains(eventFlowFile)) {
+ final ProvenanceEventBuilder eventBuilder =
entry.getValue();
+ for (final String childId :
eventBuilder.getChildFlowFileIds()) {
+ if (!flowFileIds.contains(childId)) {
+ throw new FlowFileHandlingException("Cannot
migrate " + eventFlowFile + " to a new session because it was forked to create
" + eventBuilder.getChildFlowFileIds().size()
+ + " children and not all children are being
migrated. If any FlowFile is forked, all of its children must also be migrated
at the same time as the forked FlowFile");
+ }
}
- }
- } else {
- final ProvenanceEventBuilder eventBuilder = entry.getValue();
- for (final String childId :
eventBuilder.getChildFlowFileIds()) {
- if (flowFileIds.contains(childId)) {
- throw new FlowFileHandlingException("Cannot migrate "
+ eventFlowFile + " to a new session because it was forked from a Parent
FlowFile, but the parent is not being migrated. "
- + "If any FlowFile is forked, the parent and all
children must be migrated at the same time.");
+ } else {
+ final ProvenanceEventBuilder eventBuilder =
entry.getValue();
+ for (final String childId :
eventBuilder.getChildFlowFileIds()) {
+ if (flowFileIds.contains(childId)) {
+ throw new FlowFileHandlingException("Cannot
migrate " + eventFlowFile + " to a new session because it was forked from a
Parent FlowFile, " +
+ "but the parent is not being migrated. If any
FlowFile is forked, the parent and all children must be migrated at the same
time.");
+ }
}
}
}
- }
- // If we have a FORK event where a FlowFile is a child of the FORK
event, we want to create a FORK
- // event builder for the new owner of the FlowFile and remove the
child from our fork event builder.
- final Set<FlowFile> forkedFlowFilesMigrated = new HashSet<>();
- for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry :
forkEventBuilders.entrySet()) {
- final FlowFile eventFlowFile = entry.getKey();
- final ProvenanceEventBuilder eventBuilder = entry.getValue();
+ // If we have a FORK event where a FlowFile is a child of the FORK
event, we want to create a FORK
+ // event builder for the new owner of the FlowFile and remove the
child from our fork event builder.
+ final Set<FlowFile> forkedFlowFilesMigrated = new HashSet<>();
+ for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry :
forkEventBuilders.entrySet()) {
+ final FlowFile eventFlowFile = entry.getKey();
+ final ProvenanceEventBuilder eventBuilder = entry.getValue();
- // If the FlowFile that the event is attached to is not being
migrated, we should not migrate the fork event builder either.
- if (!flowFiles.contains(eventFlowFile)) {
- continue;
- }
+ // If the FlowFile that the event is attached to is not being
migrated, we should not migrate the fork event builder either.
+ if (!flowFiles.contains(eventFlowFile)) {
+ continue;
+ }
- final Set<String> childrenIds = new
HashSet<>(eventBuilder.getChildFlowFileIds());
+ final Set<String> childrenIds = new
HashSet<>(eventBuilder.getChildFlowFileIds());
- ProvenanceEventBuilder copy = null;
- for (final FlowFile flowFile : flowFiles) {
- final String flowFileId =
flowFile.getAttribute(CoreAttributes.UUID.key());
- if (childrenIds.contains(flowFileId)) {
- eventBuilder.removeChildFlowFile(flowFile);
+ ProvenanceEventBuilder copy = null;
+ for (final FlowFile flowFile : flowFiles) {
+ final String flowFileId =
flowFile.getAttribute(CoreAttributes.UUID.key());
+ if (childrenIds.contains(flowFileId)) {
+ eventBuilder.removeChildFlowFile(flowFile);
- if (copy == null) {
- copy = eventBuilder.copy();
- copy.getChildFlowFileIds().clear();
+ if (copy == null) {
+ copy = eventBuilder.copy();
+ copy.getChildFlowFileIds().clear();
+ }
+ copy.addChildFlowFile(flowFileId);
}
- copy.addChildFlowFile(flowFileId);
}
- }
- if (copy != null) {
- newOwner.forkEventBuilders.put(eventFlowFile, copy);
- forkedFlowFilesMigrated.add(eventFlowFile);
+ if (copy != null) {
+ newOwner.forkEventBuilders.put(eventFlowFile, copy);
+ forkedFlowFilesMigrated.add(eventFlowFile);
+ }
}
- }
- forkedFlowFilesMigrated.forEach(forkEventBuilders::remove);
+ forkedFlowFilesMigrated.forEach(forkEventBuilders::remove);
- newOwner.processingStartTime = Math.min(newOwner.processingStartTime,
processingStartTime);
+ newOwner.processingStartTime =
Math.min(newOwner.processingStartTime, processingStartTime);
- for (final FlowFile flowFile : flowFiles) {
- final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
+ for (final FlowFile flowFile : flowFiles) {
+ final FlowFileRecord flowFileRecord = (FlowFileRecord)
flowFile;
- final long flowFileId = flowFile.getId();
- final StandardRepositoryRecord repoRecord =
this.records.remove(flowFileId);
- newOwner.records.put(flowFileId, repoRecord);
+ final long flowFileId = flowFile.getId();
+ final StandardRepositoryRecord repoRecord =
this.records.remove(flowFileId);
+ newOwner.records.put(flowFileId, repoRecord);
- final Collection<Long> linkedIds =
this.flowFileLinkage.remove(flowFileId);
- if (linkedIds != null) {
- linkedIds.forEach(linkedId ->
newOwner.flowFileLinkage.addLink(flowFileId, linkedId));
- }
+ final Collection<Long> linkedIds =
this.flowFileLinkage.remove(flowFileId);
+ if (linkedIds != null) {
+ linkedIds.forEach(linkedId ->
newOwner.flowFileLinkage.addLink(flowFileId, linkedId));
+ }
- // Adjust the counts for Connections for each FlowFile that was
pulled from a Connection.
- // We do not have to worry about accounting for 'input counts' on
connections because those
- // are incremented only during a checkpoint, and anything that's
been checkpointed has
- // also been committed above.
- final FlowFileQueue inputQueue = repoRecord.getOriginalQueue();
- if (inputQueue != null) {
- final String connectionId = inputQueue.getIdentifier();
- incrementConnectionOutputCounts(connectionId, -1,
-repoRecord.getOriginal().getSize());
- newOwner.incrementConnectionOutputCounts(connectionId, 1,
repoRecord.getOriginal().getSize());
+ // Adjust the counts for Connections for each FlowFile that
was pulled from a Connection.
+ // We do not have to worry about accounting for 'input counts'
on connections because those
+ // are incremented only during a checkpoint, and anything
that's been checkpointed has
+ // also been committed above.
+ final FlowFileQueue inputQueue = repoRecord.getOriginalQueue();
+ if (inputQueue != null) {
+ final String connectionId = inputQueue.getIdentifier();
+ incrementConnectionOutputCounts(connectionId, -1,
-repoRecord.getOriginal().getSize());
+ newOwner.incrementConnectionOutputCounts(connectionId, 1,
repoRecord.getOriginal().getSize());
- unacknowledgedFlowFiles.get(inputQueue).remove(flowFile);
- newOwner.unacknowledgedFlowFiles.computeIfAbsent(inputQueue,
queue -> new HashSet<>()).add(flowFileRecord);
+ unacknowledgedFlowFiles.get(inputQueue).remove(flowFile);
+
newOwner.unacknowledgedFlowFiles.computeIfAbsent(inputQueue, queue -> new
HashSet<>()).add(flowFileRecord);
- flowFilesIn--;
- contentSizeIn -= flowFile.getSize();
+ flowFilesIn--;
+ contentSizeIn -= flowFile.getSize();
- newOwner.flowFilesIn++;
- newOwner.contentSizeIn += flowFile.getSize();
- }
+ newOwner.flowFilesIn++;
+ newOwner.contentSizeIn += flowFile.getSize();
+ }
- final String flowFileUuid =
flowFile.getAttribute(CoreAttributes.UUID.key());
- if (removedFlowFiles.remove(flowFileUuid)) {
- newOwner.removedFlowFiles.add(flowFileUuid);
- newOwner.removedCount++;
- newOwner.removedBytes += flowFile.getSize();
+ final String flowFileUuid =
flowFile.getAttribute(CoreAttributes.UUID.key());
+ if (removedFlowFiles.remove(flowFileUuid)) {
+ newOwner.removedFlowFiles.add(flowFileUuid);
+ newOwner.removedCount++;
+ newOwner.removedBytes += flowFile.getSize();
- removedCount--;
- removedBytes -= flowFile.getSize();
- }
+ removedCount--;
+ removedBytes -= flowFile.getSize();
+ }
- if (createdFlowFiles.remove(flowFileUuid)) {
- newOwner.createdFlowFiles.add(flowFileUuid);
- }
+ if (createdFlowFiles.remove(flowFileUuid)) {
+ newOwner.createdFlowFiles.add(flowFileUuid);
+ }
- if (repoRecord.getTransferRelationship() != null) {
- final Relationship transferRelationship =
repoRecord.getTransferRelationship();
- final Collection<Connection> destinations =
context.getConnections(transferRelationship);
- final int numDestinations = destinations.size();
- final boolean autoTerminated = numDestinations == 0 &&
context.getConnectable().isAutoTerminated(transferRelationship);
+ if (repoRecord.getTransferRelationship() != null) {
+ final Relationship transferRelationship =
repoRecord.getTransferRelationship();
+ final Collection<Connection> destinations =
context.getConnections(transferRelationship);
+ final int numDestinations = destinations.size();
+ final boolean autoTerminated = numDestinations == 0 &&
context.getConnectable().isAutoTerminated(transferRelationship);
- if (autoTerminated) {
- removedCount--;
- removedBytes -= flowFile.getSize();
+ if (autoTerminated) {
+ removedCount--;
+ removedBytes -= flowFile.getSize();
- newOwner.removedCount++;
- newOwner.removedBytes += flowFile.getSize();
- } else {
- flowFilesOut--;
- contentSizeOut -= flowFile.getSize();
+ newOwner.removedCount++;
+ newOwner.removedBytes += flowFile.getSize();
+ } else {
+ flowFilesOut--;
+ contentSizeOut -= flowFile.getSize();
- newOwner.flowFilesOut++;
- newOwner.contentSizeOut += flowFile.getSize();
+ newOwner.flowFilesOut++;
+ newOwner.contentSizeOut += flowFile.getSize();
+ }
}
- }
- final List<ProvenanceEventRecord> events =
generatedProvenanceEvents.remove(flowFile);
- if (events != null) {
- newOwner.generatedProvenanceEvents.put(flowFile, events);
- }
+ final List<ProvenanceEventRecord> events =
generatedProvenanceEvents.remove(flowFile);
+ if (events != null) {
+ newOwner.generatedProvenanceEvents.put(flowFile, events);
+ }
- final ContentClaim currentClaim = repoRecord.getCurrentClaim();
- if (currentClaim != null) {
- final ByteCountingOutputStream appendableStream =
appendableStreams.remove(currentClaim);
- if (appendableStream != null) {
- newOwner.appendableStreams.put(currentClaim,
appendableStream);
+ final ContentClaim currentClaim = repoRecord.getCurrentClaim();
+ if (currentClaim != null) {
+ final ByteCountingOutputStream appendableStream =
appendableStreams.remove(currentClaim);
+ if (appendableStream != null) {
+ newOwner.appendableStreams.put(currentClaim,
appendableStream);
+ }
}
- }
- final Path toDelete = deleteOnCommit.remove(flowFile);
- if (toDelete != null) {
- newOwner.deleteOnCommit.put(flowFile, toDelete);
+ final Path toDelete = deleteOnCommit.remove(flowFile);
+ if (toDelete != null) {
+ newOwner.deleteOnCommit.put(flowFile, toDelete);
+ }
}
- }
- provenanceReporter.migrate(newOwner.provenanceReporter, flowFileIds);
+ provenanceReporter.migrate(newOwner.provenanceReporter,
flowFileIds);
+ }
}
@@ -1793,11 +1803,7 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
flowFilesIn++;
contentSizeIn += flowFile.getSize();
- Set<FlowFileRecord> set =
unacknowledgedFlowFiles.get(connection.getFlowFileQueue());
- if (set == null) {
- set = new HashSet<>();
- unacknowledgedFlowFiles.put(connection.getFlowFileQueue(), set);
- }
+ final Set<FlowFileRecord> set =
unacknowledgedFlowFiles.computeIfAbsent(connection.getFlowFileQueue(), k -> new
HashSet<>());
set.add(flowFile);
incrementConnectionOutputCounts(connection, flowFile);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
index c2dd4e0..c5b78c9 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java
@@ -23,7 +23,9 @@ import
org.apache.nifi.processor.exception.TerminatedTaskException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import java.util.WeakHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -36,7 +38,7 @@ public class LifecycleState {
private final AtomicBoolean mustCallOnStoppedMethods = new
AtomicBoolean(false);
private volatile long lastStopTime = -1;
private volatile boolean terminated = false;
- private final Set<ActiveProcessSessionFactory>
activeProcessSessionFactories = Collections.synchronizedSet(new HashSet<>());
+ private final Map<ActiveProcessSessionFactory, Object>
activeProcessSessionFactories = new WeakHashMap<>();
public synchronized int incrementActiveThreadCount(final
ActiveProcessSessionFactory sessionFactory) {
if (terminated) {
@@ -44,21 +46,29 @@ public class LifecycleState {
}
if (sessionFactory != null) {
- activeProcessSessionFactories.add(sessionFactory);
+ // If a session factory is provided, add it to our WeakHashMap.
The value that we use is not relevant,
+ // as this just serves, essentially, as a WeakHashSet, but there
is no WeakHashSet implementation.
+ // We need to keep track of any ActiveProcessSessionFactory that
has been created for this component,
+ // as long as the session factory has not been garbage collected.
This is important because when we offload
+ // a node, we will terminate all active processors and we need the
ability to terminate any active sessions
+ // at that time. We cannot simply store a Set of all
ActiveProcessSessionFactories and then remove them in the
+ // decrementActiveThreadCount because a Processor may choose to
continue using the ProcessSessionFactory even after
+ // returning from its onTrigger method.
+ //
+ // For example, it may stash the ProcessSessionFactory away in a
member variable in order to aggregate FlowFiles across
+ // many onTrigger invocations. In this case, we need the ability
to force the rollback of any created session upon Processor
+ // termination.
+ activeProcessSessionFactories.put(sessionFactory, null);
}
return activeThreadCount.incrementAndGet();
}
- public synchronized int decrementActiveThreadCount(final
ActiveProcessSessionFactory sessionFactory) {
+ public synchronized int decrementActiveThreadCount() {
if (terminated) {
return activeThreadCount.get();
}
- if (sessionFactory != null) {
- activeProcessSessionFactories.remove(sessionFactory);
- }
-
return activeThreadCount.decrementAndGet();
}
@@ -85,8 +95,7 @@ public class LifecycleState {
@Override
public String toString() {
- return new
StringBuilder().append("activeThreads:").append(activeThreadCount.get()).append(";
")
- .append("scheduled:").append(scheduled.get()).append(";
").toString();
+ return "LifecycleState[activeThreads= " + activeThreadCount.get() + ",
scheduled=" + scheduled.get() + "]";
}
/**
@@ -123,7 +132,8 @@ public class LifecycleState {
this.terminated = true;
activeThreadCount.set(0);
- for (final ActiveProcessSessionFactory factory :
activeProcessSessionFactories) {
+ // Terminate any active sessions.
+ for (final ActiveProcessSessionFactory factory :
activeProcessSessionFactories.keySet()) {
factory.terminateActiveSessions();
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java
index 60ad64d..79fffab 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java
@@ -43,4 +43,9 @@ public class FlowFileQueueContents {
public QueueSize getSwapSize() {
return swapSize;
}
+
+ @Override
+ public String toString() {
+ return "FlowFileQueueContents[swapLocations=" + swapLocations + ",
swapSize=" + swapSize + ", activeFlowFiles=" + activeFlowFiles + "]";
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index c8f2b18..d419945 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -501,7 +501,12 @@ public class SwappablePriorityQueue {
}
public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
- logger.trace("{} Acknowledging {}", this, flowFiles);
+ if (logger.isTraceEnabled()) {
+ for (final FlowFileRecord flowFile : flowFiles) {
+ logger.trace("{} Acknowledging {}", this, flowFile);
+ }
+ }
+
final long totalSize =
flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum();
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
}
@@ -627,8 +632,10 @@ public class SwappablePriorityQueue {
writeLock.unlock("poll(int, Set)");
}
- if (!records.isEmpty()) {
- logger.trace("{} poll() returning {}", this, records);
+ if (!records.isEmpty() && logger.isTraceEnabled()) {
+ for (final FlowFileRecord flowFile : records) {
+ logger.trace("{} poll() returning {}", this, flowFile);
+ }
}
return records;
@@ -690,8 +697,10 @@ public class SwappablePriorityQueue {
this.activeQueue.addAll(unselected);
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
- if (!selectedFlowFiles.isEmpty()) {
- logger.trace("{} poll() returning {}", this,
selectedFlowFiles);
+ if (!selectedFlowFiles.isEmpty() && logger.isTraceEnabled()) {
+ for (final FlowFileRecord flowFile : selectedFlowFiles) {
+ logger.trace("{} poll() returning {}", this, flowFile);
+ }
}
return selectedFlowFiles;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 2a0c504..9c58575 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -244,7 +244,10 @@ public class SocketLoadBalancedFlowFileQueue extends
AbstractFlowFileQueue imple
return;
}
- logger.debug("Setting queue {} on node {} as offloaded", this,
clusterCoordinator.getLocalNodeIdentifier());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Setting queue {} on node {} as offloaded. Current
size: {}, Partition Sizes: {}", this,
clusterCoordinator.getLocalNodeIdentifier(), size(), getPartitionSizes());
+ }
+
offloaded = true;
partitionWriteLock.lock();
@@ -271,11 +274,30 @@ public class SocketLoadBalancedFlowFileQueue extends
AbstractFlowFileQueue imple
// Update our partitioner so that we don't keep any data on the
local partition
setFlowFilePartitioner(new NonLocalPartitionPartitioner());
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Queue {} has now updated Partition on node {}
for offload. Current size: {}, Partition Sizes: {}",
+ this, clusterCoordinator.getLocalNodeIdentifier(), size(),
getPartitionSizes());
+ }
} finally {
partitionWriteLock.unlock();
}
}
+ private Map<QueuePartition, QueueSize> getPartitionSizes() {
+ partitionReadLock.lock();
+ try {
+ final Map<QueuePartition, QueueSize> sizeMap = new HashMap<>();
+ for (final QueuePartition partition : queuePartitions) {
+ sizeMap.put(partition, partition.size());
+ }
+
+ return sizeMap;
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
@Override
public void resetOffloadedQueue() {
if (clusterCoordinator == null) {
@@ -899,7 +921,7 @@ public class SocketLoadBalancedFlowFileQueue extends
AbstractFlowFileQueue imple
final List<FlowFileRecord> flowFileList = (flowFiles
instanceof List) ? (List<FlowFileRecord>) flowFiles : new
ArrayList<>(flowFiles);
partitionMap = Collections.singletonMap(partition,
flowFileList);
- logger.debug("Partitioner is static so Partitioned FlowFiles
as: {}", partitionMap);
+ logger.debug("Partitioner {} is static so Partitioned
FlowFiles as: {}", partitioner, partitionMap);
return partitionMap;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
index de46d17..ce0f8e3 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
@@ -30,6 +30,8 @@ import
org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
@@ -39,7 +41,9 @@ import java.util.Optional;
import java.util.Set;
public class StandardRebalancingPartition implements RebalancingPartition {
- private final String SWAP_PARTITION_NAME = "rebalance";
+ private static final Logger logger =
LoggerFactory.getLogger(StandardRebalancingPartition.class);
+ private static final String SWAP_PARTITION_NAME = "rebalance";
+
private final String queueIdentifier;
private final BlockingSwappablePriorityQueue queue;
private final LoadBalancedFlowFileQueue flowFileQueue;
@@ -127,11 +131,13 @@ public class StandardRebalancingPartition implements
RebalancingPartition {
private synchronized void rebalanceFromQueue() {
if (stopped) {
+ logger.debug("Will not rebalance from queue because {} is
stopped", this);
return;
}
// If a task is already defined, do nothing. There's already a thread
running.
if (rebalanceTask != null) {
+ logger.debug("Rebalance Task already exists for {}", this);
return;
}
@@ -140,6 +146,7 @@ public class StandardRebalancingPartition implements
RebalancingPartition {
final Thread rebalanceThread = new Thread(this.rebalanceTask);
rebalanceThread.setName("Rebalance queued data for Connection " +
queueIdentifier);
rebalanceThread.start();
+ logger.debug("No Rebalance Task currently exists for {}. Starting new
Rebalance Thread {}", this, rebalanceThread);
}
@Override
@@ -148,12 +155,16 @@ public class StandardRebalancingPartition implements
RebalancingPartition {
return;
}
+ logger.debug("Adding {} to Rebalance queue for {}", queueContents,
this);
+
queue.inheritQueueContents(queueContents);
rebalanceFromQueue();
}
@Override
public void rebalance(final Collection<FlowFileRecord> flowFiles) {
+ logger.debug("Adding {} to Rebalance queue for {}", flowFiles, this);
+
queue.putAll(flowFiles);
rebalanceFromQueue();
}
@@ -163,7 +174,7 @@ public class StandardRebalancingPartition implements
RebalancingPartition {
return queue.packageForRebalance(newPartitionName);
}
- private synchronized boolean complete() {
+ private synchronized boolean isComplete() {
if (!queue.isEmpty()) {
return false;
}
@@ -201,7 +212,8 @@ public class StandardRebalancingPartition implements
RebalancingPartition {
if (polled == null) {
flowFileQueue.handleExpiredRecords(expiredRecords);
- if (complete()) {
+ if (isComplete()) {
+ logger.debug("Rebalance Task completed for {}", this);
return;
} else {
continue;
@@ -217,6 +229,8 @@ public class StandardRebalancingPartition implements
RebalancingPartition {
flowFileQueue.handleExpiredRecords(expiredRecords);
+ logger.debug("{} Rebalancing {}", this, toDistribute);
+
// Transfer all of the FlowFiles that we got back to the
FlowFileQueue itself. This will cause the data to be
// re-partitioned and binned appropriately. We also then need
to ensure that we acknowledge the data from our
// own SwappablePriorityQueue to ensure that the sizes are
kept in check.
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 4ccccd9..6751663 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -316,7 +316,7 @@ public class EventDrivenSchedulingAgent extends
AbstractSchedulingAgent {
// reaching the maximum number of threads. we won't know this
until we atomically increment the thread count
// on the Schedule State, so we check it here. in this case,
we cannot trigger the Processor, as doing so would
// result in using more than the maximum number of defined
threads
- scheduleState.decrementActiveThreadCount(sessionFactory);
+ scheduleState.decrementActiveThreadCount();
return;
}
@@ -344,7 +344,7 @@ public class EventDrivenSchedulingAgent extends
AbstractSchedulingAgent {
}
}
- scheduleState.decrementActiveThreadCount(sessionFactory);
+ scheduleState.decrementActiveThreadCount();
}
}
@@ -357,7 +357,7 @@ public class EventDrivenSchedulingAgent extends
AbstractSchedulingAgent {
// reaching the maximum number of threads. we won't know this
until we atomically increment the thread count
// on the Schedule State, so we check it here. in this case,
we cannot trigger the Processor, as doing so would
// result in using more than the maximum number of defined
threads
- scheduleState.decrementActiveThreadCount(sessionFactory);
+ scheduleState.decrementActiveThreadCount();
return;
}
@@ -386,7 +386,7 @@ public class EventDrivenSchedulingAgent extends
AbstractSchedulingAgent {
}
}
- scheduleState.decrementActiveThreadCount(sessionFactory);
+ scheduleState.decrementActiveThreadCount();
}
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 8062122..a7dc118 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -343,7 +343,7 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
@Override
public void onTaskComplete() {
- lifecycleState.decrementActiveThreadCount(null);
+ lifecycleState.decrementActiveThreadCount();
}
};
@@ -383,7 +383,7 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
@Override
public void onTaskComplete() {
- lifecycleState.decrementActiveThreadCount(null);
+ lifecycleState.decrementActiveThreadCount();
}
};
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 5ade471..9b2a555 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -283,7 +283,7 @@ public class ConnectableTask {
logger.error("", e);
}
} finally {
- scheduleState.decrementActiveThreadCount(activeSessionFactory);
+ scheduleState.decrementActiveThreadCount();
Thread.currentThread().setName(originalThreadName);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
index b60302a..58ce022 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
@@ -58,7 +58,7 @@ public class ReportingTaskWrapper implements Runnable {
}
}
} finally {
- lifecycleState.decrementActiveThreadCount(null);
+ lifecycleState.decrementActiveThreadCount();
}
}
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessSchedulingAgent.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessSchedulingAgent.java
index c772047..94dc01c 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessSchedulingAgent.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessSchedulingAgent.java
@@ -88,7 +88,7 @@ public class StatelessSchedulingAgent implements
SchedulingAgent {
try {
taskNode.getReportingTask().onTrigger(taskNode.getReportingContext());
} finally {
- scheduleState.decrementActiveThreadCount(null);
+ scheduleState.decrementActiveThreadCount();
}
} catch (final Throwable t) {