This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new f08c2ee NIFI-6285: Addressed issue that resulted in swapped data not
being swapped back in if load balancing strategy changed while data was swapped
out; added integration tests for swapping. In testing, also encountered an
issue with data being swapped out while swap files were being recovered causing
the queue size to be wrong and causing errors about not being able to swap data
in, because it attempted to swap the data in twice.
f08c2ee is described below
commit f08c2ee43f39584b7d4e76d61d7d4aaa889068fd
Author: Mark Payne <[email protected]>
AuthorDate: Mon May 13 11:28:21 2019 -0400
NIFI-6285: Addressed issue that resulted in swapped data not being swapped
back in if load balancing strategy changed while data was swapped out; added
integration tests for swapping. In testing, also encountered an issue with data
being swapped out while swap files were being recovered causing the queue size
to be wrong and causing errors about not being able to swap data in, because it
attempted to swap the data in twice.
This closes #3473.
Signed-off-by: Bryan Bende <[email protected]>
---
.../nifi/controller/FileSystemSwapManager.java | 1 +
.../org/apache/nifi/controller/FlowController.java | 4 +
.../controller/queue/SwappablePriorityQueue.java | 55 +++--
.../clustered/SocketLoadBalancedFlowFileQueue.java | 6 +
.../async/nio/NioAsyncLoadBalanceClientTask.java | 5 +-
.../repository/WriteAheadFlowFileRepository.java | 32 ++-
.../manager/StandardStateManagerProvider.java | 12 +-
.../TestWriteAheadFlowFileRepository.java | 22 +-
.../nifi/integration/FrameworkIntegrationTest.java | 270 +++++++++++++++------
.../lifecycle/FlowFileRepositoryLifecycleIT.java | 3 +-
.../integration/processor/BiConsumerProcessor.java | 12 +-
.../integration/processors/GenerateProcessor.java | 81 +++++++
.../NopProcessor.java} | 33 +--
.../TerminateAll.java} | 37 +--
.../TerminateOnce.java} | 36 +--
.../nifi/integration/swap/ClusteredSwapFileIT.java | 249 +++++++++++++++++++
.../integration/swap/StandaloneSwapFileIT.java | 93 +++++++
.../resources/int-tests/clustered-nifi.properties | 256 +++++++++++++++++++
.../test/resources/int-tests/state-management.xml | 8 +
.../test/resources/int-tests/zookeeper.properties | 45 ++++
20 files changed, 1050 insertions(+), 210 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 8a0aa3b..d41ded5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -463,6 +463,7 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
// Use Files.move and convert to Path's instead of File.rename so that
we get an IOException on failure that describes why we failed.
Files.move(existingFile.toPath(), newFile.toPath());
+ logger.debug("Changed Partition for Swap File by renaming from {} to
{}", swapLocation, newPartitionName);
return newFile.getAbsolutePath();
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 4141832..b7a5d79 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2650,6 +2650,10 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
return replayEvent;
}
+ public ResourceClaimManager getResourceClaimManager() {
+ return resourceClaimManager;
+ }
+
public boolean isConnected() {
rwLock.readLock().lock();
try {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index df19f44..8613130 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
@@ -195,6 +196,8 @@ public class SwappablePriorityQueue {
final String swapLocation = swapManager.swapOut(toSwap,
flowFileQueue, swapPartitionName);
swapLocations.add(swapLocation);
+ logger.debug("Successfully wrote out Swap File {} containing
{} FlowFiles ({} bytes)", swapLocation, toSwap.size(),
bytesSwappedThisIteration);
+
bytesSwappedOut += bytesSwappedThisIteration;
flowFilesSwappedOut += toSwap.size();
} catch (final IOException ioe) {
@@ -255,6 +258,7 @@ public class SwappablePriorityQueue {
}
this.swapLocations.addAll(swapLocations);
+ logger.debug("After writing swap files, setting new set of Swap
Locations to {}", this.swapLocations);
}
private int getFlowFileCount() {
@@ -337,6 +341,7 @@ public class SwappablePriorityQueue {
boolean partialContents = false;
SwapContents swapContents;
try {
+ logger.debug("Attempting to swap in {}; all swap locations = {}",
swapLocation, swapLocations);
swapContents = swapManager.swapIn(swapLocation, flowFileQueue);
swapLocations.remove(0);
} catch (final IncompleteSwapFileException isfe) {
@@ -391,7 +396,7 @@ public class SwappablePriorityQueue {
} else {
// we swapped in the whole swap file. We can just use the info
that we got from the summary.
incrementActiveQueueSize(flowFileCount, contentSize);
- logger.debug("Successfully swapped in Swap File {}", swapLocation);
+ logger.debug("Successfully swapped in Swap File {} containing {}
FlowFiles ({} bytes)", swapLocation, flowFileCount, contentSize);
}
activeQueue.addAll(swapContents.getFlowFiles());
@@ -411,12 +416,12 @@ public class SwappablePriorityQueue {
}
public void acknowledge(final FlowFileRecord flowFile) {
- logger.debug("{} Acknowledging {}", this, flowFile);
+ logger.trace("{} Acknowledging {}", this, flowFile);
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
}
public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
- logger.debug("{} Acknowledging {}", this, flowFiles);
+ logger.trace("{} Acknowledging {}", this, flowFiles);
final long totalSize =
flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum();
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
}
@@ -435,7 +440,7 @@ public class SwappablePriorityQueue {
activeQueue.add(flowFile);
}
- logger.debug("{} put to {}", flowFile, this);
+ logger.trace("{} put to {}", flowFile, this);
} finally {
writeLock.unlock("put(FlowFileRecord)");
}
@@ -460,7 +465,7 @@ public class SwappablePriorityQueue {
activeQueue.addAll(flowFiles);
}
- logger.debug("{} put to {}", flowFiles, this);
+ logger.trace("{} put to {}", flowFiles, this);
} finally {
writeLock.unlock("putAll");
}
@@ -475,7 +480,7 @@ public class SwappablePriorityQueue {
flowFile = doPoll(expiredRecords, expirationMillis);
if (flowFile != null) {
- logger.debug("{} poll() returning {}", this, flowFile);
+ logger.trace("{} poll() returning {}", this, flowFile);
incrementUnacknowledgedQueueSize(1, flowFile.getSize());
}
@@ -535,7 +540,7 @@ public class SwappablePriorityQueue {
}
if (!records.isEmpty()) {
- logger.debug("{} poll() returning {}", this, records);
+ logger.trace("{} poll() returning {}", this, records);
}
return records;
@@ -594,7 +599,7 @@ public class SwappablePriorityQueue {
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
if (!selectedFlowFiles.isEmpty()) {
- logger.debug("{} poll() returning {}", this,
selectedFlowFiles);
+ logger.trace("{} poll() returning {}", this,
selectedFlowFiles);
}
return selectedFlowFiles;
@@ -816,12 +821,13 @@ public class SwappablePriorityQueue {
Long maxId = null;
List<ResourceClaim> resourceClaims = new ArrayList<>();
final long startNanos = System.nanoTime();
+ int failures = 0;
writeLock.lock();
try {
- final List<String> swapLocations;
+ final List<String> swapLocationsFromSwapManager;
try {
- swapLocations =
swapManager.recoverSwapLocations(flowFileQueue, swapPartitionName);
+ swapLocationsFromSwapManager =
swapManager.recoverSwapLocations(flowFileQueue, swapPartitionName);
} catch (final IOException ioe) {
logger.error("Failed to determine whether or not any Swap
Files exist for FlowFile Queue {}", getQueueIdentifier());
logger.error("", ioe);
@@ -832,7 +838,14 @@ public class SwappablePriorityQueue {
return null;
}
- logger.debug("Recovered {} Swap Files for {}: {}",
swapLocations.size(), flowFileQueue, swapLocations);
+ // If we have a duplicate of any of the swap location that we
already know about, we need to filter those out now.
+ // This can happen when, upon startup, we need to swap data out
during the swap file recovery. In this case, we do
+ // not want to include such a swap file in those that we recover,
because those have already been accounted for when
+ // they were added to the queue, before being swapped out.
+ final Set<String> swapLocations = new
LinkedHashSet<>(swapLocationsFromSwapManager);
+ swapLocations.removeAll(this.swapLocations);
+
+ logger.debug("Swap Manager reports {} Swap Files for {}: {}",
swapLocations.size(), flowFileQueue, swapLocations);
for (final String swapLocation : swapLocations) {
try {
final SwapSummary summary =
swapManager.getSwapSummary(swapLocation);
@@ -848,7 +861,8 @@ public class SwappablePriorityQueue {
swapByteCount += queueSize.getByteCount();
resourceClaims.addAll(summary.getResourceClaims());
} catch (final IOException ioe) {
- logger.error("Failed to recover FlowFiles from Swap File
{}; the file appears to be corrupt", swapLocation, ioe.toString());
+ failures++;
+ logger.error("Failed to recover FlowFiles from Swap File
{}; the file appears to be corrupt", swapLocation);
logger.error("", ioe);
if (eventReporter != null) {
eventReporter.reportEvent(Severity.ERROR, "FlowFile
Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation +
@@ -863,9 +877,11 @@ public class SwappablePriorityQueue {
writeLock.unlock("Recover Swap Files");
}
- if (!swapLocations.isEmpty()) {
+ if (swapLocations.isEmpty()) {
+ logger.debug("No swap files were recovered for {}", flowFileQueue);
+ } else {
final long millis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- logger.info("Recovered {} swap files for {} in {} millis",
swapLocations.size(), this, millis);
+ logger.info("Recovered {} swap files for {} in {} millis",
swapLocations.size() - failures, this, millis);
}
return new StandardSwapSummary(new QueueSize(swapFlowFileCount,
swapByteCount), maxId, resourceClaims);
@@ -943,8 +959,14 @@ public class SwappablePriorityQueue {
writeLock.lock();
try {
putAll(queueContents.getActiveFlowFiles());
- swapLocations.addAll(queueContents.getSwapLocations());
+
+ final List<String> inheritedSwapLocations =
queueContents.getSwapLocations();
+ swapLocations.addAll(inheritedSwapLocations);
incrementSwapQueueSize(queueContents.getSwapSize().getObjectCount(),
queueContents.getSwapSize().getByteCount(),
queueContents.getSwapLocations().size());
+
+ if (!inheritedSwapLocations.isEmpty()) {
+ logger.debug("Inherited the following swap locations: {}",
inheritedSwapLocations);
+ }
} finally {
writeLock.unlock("inheritQueueContents");
}
@@ -985,6 +1007,7 @@ public class SwappablePriorityQueue {
updated = updateSize(currentSize, updatedSize);
} while (!updated);
+ logger.debug("Cleared {} to package FlowFile for rebalance to {}",
this, newPartitionName);
return new FlowFileQueueContents(activeRecords,
updatedSwapLocations, swapSize);
} finally {
writeLock.unlock("packageForRebalance(SwappablePriorityQueue)");
@@ -993,6 +1016,6 @@ public class SwappablePriorityQueue {
@Override
public String toString() {
- return "SwappablePriorityQueue[queueId=" +
flowFileQueue.getIdentifier() + "]";
+ return "SwappablePriorityQueue[queueId=" +
flowFileQueue.getIdentifier() + ", partition=" + swapPartitionName + "]";
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 12ee8d8..f7b2f38 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -744,6 +744,7 @@ public class SocketLoadBalancedFlowFileQueue extends
AbstractFlowFileQueue imple
putAndGetPartition(flowFile);
}
+
protected QueuePartition putAndGetPartition(final FlowFileRecord flowFile)
{
final QueuePartition partition;
@@ -1160,5 +1161,10 @@ public class SocketLoadBalancedFlowFileQueue extends
AbstractFlowFileQueue imple
}
}
}
+
+ @Override
+ public String toString() {
+ return "FlowFileQueue[id=" + getIdentifier() + ", Load Balance
Strategy=" + getLoadBalanceStrategy() + ", size=" + size() + "]";
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
index 5c8073a..5636e22 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
@@ -79,7 +79,7 @@ public class NioAsyncLoadBalanceClientTask implements
Runnable {
}
} catch (final Exception e) {
eventReporter.reportEvent(Severity.ERROR,
EVENT_CATEGORY, "Failed to communicate with Peer "
- + client.getNodeIdentifier() + " while trying
to load balance data across the cluster due to " + e.toString());
+ + client.getNodeIdentifier() + " while trying to
load balance data across the cluster due to " + e.toString());
logger.error("Failed to communicate with Peer {} while
trying to load balance data across the cluster.", client.getNodeIdentifier(),
e);
}
@@ -90,6 +90,9 @@ public class NioAsyncLoadBalanceClientTask implements
Runnable {
logger.trace("Was unable to communicate with any client.
Will sleep for 10 milliseconds.");
Thread.sleep(10L);
}
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return;
} catch (final Exception e) {
logger.error("Failed to communicate with peer while trying to
load balance data across the cluster", e);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY,
"Failed to comunicate with Peer while trying to load balance data across the
cluster due to " + e);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 05dbf88..1337511 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.repository;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
@@ -102,7 +103,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
private final int numPartitions;
private final ScheduledExecutorService checkpointExecutor;
- private final Set<String> swapLocationSuffixes = new HashSet<>(); //
guraded by synchronizing on object itself
+ private final Set<String> swapLocationSuffixes = new HashSet<>(); //
guarded by synchronizing on object itself
// effectively final
private WriteAheadRepository<RepositoryRecord> wal;
@@ -288,7 +289,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
@Override
public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
synchronized (swapLocationSuffixes) {
- return swapLocationSuffixes.contains(swapLocationSuffix);
+ return
swapLocationSuffixes.contains(normalizeSwapLocation(swapLocationSuffix));
}
}
@@ -370,8 +371,8 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
// If we have swapped files in or out, we need to ensure that we
update our swapLocationSuffixes.
if (!swapLocationsAdded.isEmpty() || !swapLocationsRemoved.isEmpty()) {
synchronized (swapLocationSuffixes) {
- swapLocationsRemoved.forEach(loc ->
swapLocationSuffixes.remove(getLocationSuffix(loc)));
- swapLocationsAdded.forEach(loc ->
swapLocationSuffixes.add(getLocationSuffix(loc)));
+ swapLocationsRemoved.forEach(loc ->
swapLocationSuffixes.remove(normalizeSwapLocation(loc)));
+ swapLocationsAdded.forEach(loc ->
swapLocationSuffixes.add(normalizeSwapLocation(loc)));
}
}
@@ -415,19 +416,26 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
- protected static String getLocationSuffix(final String swapLocation) {
+ protected static String normalizeSwapLocation(final String swapLocation) {
if (swapLocation == null) {
return null;
}
final String normalizedPath = swapLocation.replace("\\", "/");
final String withoutTrailing = (normalizedPath.endsWith("/") &&
normalizedPath.length() > 1) ? normalizedPath.substring(0,
normalizedPath.length() - 1) : normalizedPath;
- final int lastIndex = withoutTrailing.lastIndexOf("/");
- if (lastIndex < 0 || lastIndex >= withoutTrailing.length() - 1) {
- return withoutTrailing;
+ final String pathRemoved = getLocationSuffix(withoutTrailing);
+
+ final String normalized = StringUtils.substringBefore(pathRemoved,
".");
+ return normalized;
+ }
+
+ private static String getLocationSuffix(final String swapLocation) {
+ final int lastIndex = swapLocation.lastIndexOf("/");
+ if (lastIndex < 0 || lastIndex >= swapLocation.length() - 1) {
+ return swapLocation;
}
- return withoutTrailing.substring(lastIndex + 1);
+ return swapLocation.substring(lastIndex + 1);
}
@Override
@@ -486,7 +494,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
wal.update(repoRecords, true);
synchronized (this.swapLocationSuffixes) {
- this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
+ this.swapLocationSuffixes.add(normalizeSwapLocation(swapLocation));
}
logger.info("Successfully swapped out {} FlowFiles from {} to Swap
File {}", new Object[]{swappedOut.size(), queue, swapLocation});
@@ -507,7 +515,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
updateRepository(repoRecords, true);
synchronized (this.swapLocationSuffixes) {
- this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
+ this.swapLocationSuffixes.add(normalizeSwapLocation(swapLocation));
}
logger.info("Repository updated to reflect that {} FlowFiles were
swapped in to {}", new Object[]{swapRecords.size(), queue});
@@ -633,7 +641,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
final Set<String> recoveredSwapLocations =
wal.getRecoveredSwapLocations();
synchronized (this.swapLocationSuffixes) {
- recoveredSwapLocations.forEach(loc ->
this.swapLocationSuffixes.add(getLocationSuffix(loc)));
+ recoveredSwapLocations.forEach(loc ->
this.swapLocationSuffixes.add(normalizeSwapLocation(loc)));
logger.debug("Recovered {} Swap Files: {}",
swapLocationSuffixes.size(), swapLocationSuffixes);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
index dbdbb6b..6a6eb63 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
@@ -89,6 +89,10 @@ public class StandardStateManagerProvider implements
StateManagerProvider{
return provider;
}
+ public static synchronized void resetProvider() {
+ provider = null;
+ }
+
private static StateProvider createLocalStateProvider(final NiFiProperties
properties, final VariableRegistry variableRegistry, final ExtensionManager
extensionManager)
throws IOException, ConfigParseException {
final File configFile = properties.getStateManagementConfigFile();
@@ -420,26 +424,26 @@ public class StandardStateManagerProvider implements
StateManagerProvider{
try {
mgr.clear(Scope.CLUSTER);
} catch (final Exception e) {
- logger.warn("Component with ID {} was removed from NiFi instance
but failed to clear clustered state for the component", e);
+ logger.warn("Component with ID {} was removed from NiFi instance
but failed to clear clustered state for the component", componentId, e);
}
try {
mgr.clear(Scope.LOCAL);
} catch (final Exception e) {
- logger.warn("Component with ID {} was removed from NiFi instance
but failed to clear local state for the component", e);
+ logger.warn("Component with ID {} was removed from NiFi instance
but failed to clear local state for the component", componentId, e);
}
try {
localStateProvider.onComponentRemoved(componentId);
} catch (final Exception e) {
- logger.warn("Component with ID {} was removed from NiFi instance
but failed to cleanup resources used to maintain its local state", e);
+ logger.warn("Component with ID {} was removed from NiFi instance
but failed to cleanup resources used to maintain its local state", componentId,
e);
}
if (clusterStateProvider != null) {
try {
clusterStateProvider.onComponentRemoved(componentId);
} catch (final Exception e) {
- logger.warn("Component with ID {} was removed from NiFi
instance but failed to cleanup resources used to maintain its clustered state",
e);
+ logger.warn("Component with ID {} was removed from NiFi
instance but failed to cleanup resources used to maintain its clustered state",
componentId, e);
}
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 4faba26..964744b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -398,17 +398,17 @@ public class TestWriteAheadFlowFileRepository {
@Test
- public void testGetLocationSuffix() {
- assertEquals("/", WriteAheadFlowFileRepository.getLocationSuffix("/"));
- assertEquals("", WriteAheadFlowFileRepository.getLocationSuffix(""));
- assertEquals(null,
WriteAheadFlowFileRepository.getLocationSuffix(null));
- assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("test.txt"));
- assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("/test.txt"));
- assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("/tmp/test.txt"));
- assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("//test.txt"));
- assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("/path/to/other/file/repository/test.txt"));
- assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("test.txt/"));
- assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("/path/to/test.txt/"));
+ public void testNormalizeSwapLocation() {
+ assertEquals("/",
WriteAheadFlowFileRepository.normalizeSwapLocation("/"));
+ assertEquals("",
WriteAheadFlowFileRepository.normalizeSwapLocation(""));
+ assertEquals(null,
WriteAheadFlowFileRepository.normalizeSwapLocation(null));
+ assertEquals("test",
WriteAheadFlowFileRepository.normalizeSwapLocation("test.txt"));
+ assertEquals("test",
WriteAheadFlowFileRepository.normalizeSwapLocation("/test.txt"));
+ assertEquals("test",
WriteAheadFlowFileRepository.normalizeSwapLocation("/tmp/test.txt"));
+ assertEquals("test",
WriteAheadFlowFileRepository.normalizeSwapLocation("//test.txt"));
+ assertEquals("test",
WriteAheadFlowFileRepository.normalizeSwapLocation("/path/to/other/file/repository/test.txt"));
+ assertEquals("test",
WriteAheadFlowFileRepository.normalizeSwapLocation("test.txt/"));
+ assertEquals("test",
WriteAheadFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/"));
}
@Test
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index 157f9d1..6826b59 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -20,6 +20,13 @@ import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connection;
@@ -28,12 +35,13 @@ import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FileSystemSwapManager;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.StandardFlowSynchronizer;
import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.flow.StandardFlowManager;
+import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.queue.FlowFileQueueFactory;
-import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FileSystemRepository;
@@ -47,15 +55,14 @@ import
org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
+import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import
org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import
org.apache.nifi.controller.status.history.VolatileComponentStatusRepository;
@@ -65,8 +72,15 @@ import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.processor.BiConsumerProcessor;
+import org.apache.nifi.integration.processors.GenerateProcessor;
+import org.apache.nifi.integration.processors.NopProcessor;
+import org.apache.nifi.integration.processors.TerminateAll;
+import org.apache.nifi.integration.processors.TerminateOnce;
+import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.persistence.FlowConfigurationDAO;
+import org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
@@ -78,19 +92,28 @@ import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.WriteAheadProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FileUtils;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -100,38 +123,56 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class FrameworkIntegrationTest {
+ private static final Logger logger =
LoggerFactory.getLogger(FrameworkIntegrationTest.class);
+
//@Rule
public Timeout globalTimeout = Timeout.seconds(20);
+ @Rule
+ public TestName name = new TestName();
+
+
private ResourceClaimManager resourceClaimManager;
private StandardProcessScheduler processScheduler;
private FlowEngine flowEngine;
private FlowController flowController;
- private FlowRegistryClient flowRegistryClient = null;
+ private FlowRegistryClient flowRegistryClient = new
StandardFlowRegistryClient();
private ProcessorNode nopProcessor;
private ProcessorNode terminateProcessor;
private ProcessorNode terminateAllProcessor;
- private FlowFileQueueFactory flowFileQueueFactory;
private FlowFileSwapManager flowFileSwapManager;
private DirectInjectionExtensionManager extensionManager;
private ProcessGroup rootProcessGroup;
private Bundle systemBundle;
+ private ClusterCoordinator clusterCoordinator;
+ private NiFiProperties nifiProperties;
public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success").build();
@Before
public void setup() throws IOException {
+ StandardStateManagerProvider.resetProvider();
+
cleanup();
- initialize(null);
+ initialize();
+
+ flowController.initializeFlow();
+ createFlow();
}
protected String getNiFiPropertiesFilename() {
- return "src/test/resources/int-tests/default-nifi.properties";
+ if (isClusteredTest()) {
+ return "src/test/resources/int-tests/clustered-nifi.properties";
+ } else {
+ return "src/test/resources/int-tests/default-nifi.properties";
+ }
}
protected Map<String, String> getNiFiPropertiesOverrides() {
@@ -142,14 +183,20 @@ public class FrameworkIntegrationTest {
// Placeholder for subclasses.
}
- protected final void initialize(final QueueProvider queueProvider) throws
IOException {
- final NiFiProperties nifiProperties =
NiFiProperties.createBasicNiFiProperties(getNiFiPropertiesFilename(),
getNiFiPropertiesOverrides());
- initialize(nifiProperties, queueProvider);
+ protected final void initialize() throws IOException {
+ final Map<String, String> propertyOverrides = new
HashMap<>(getNiFiPropertiesOverrides());
+ if (isClusteredTest()) {
+ propertyOverrides.put(NiFiProperties.CLUSTER_IS_NODE, "true");
+ }
+
+ final NiFiProperties nifiProperties =
NiFiProperties.createBasicNiFiProperties(getNiFiPropertiesFilename(),
propertyOverrides);
+ initialize(nifiProperties);
}
- protected final void initialize(final NiFiProperties nifiProperties, final
QueueProvider queueProvider) throws IOException {
+ protected final void initialize(final NiFiProperties nifiProperties)
throws IOException {
+ this.nifiProperties = nifiProperties;
+
final FlowFileEventRepository flowFileEventRepository = new
RingBufferEventRepository(5);
- resourceClaimManager = new StandardResourceClaimManager();
final BulletinRepository bulletinRepo = new
VolatileBulletinRepository();
flowEngine = new FlowEngine(4, "unit test flow engine");
@@ -161,7 +208,12 @@ public class FrameworkIntegrationTest {
extensionManager.injectExtensionType(StateProvider.class,
WriteAheadLocalStateProvider.class);
extensionManager.injectExtensionType(ComponentStatusRepository.class,
VolatileComponentStatusRepository.class);
extensionManager.injectExtensionType(FlowFileSwapManager.class,
FileSystemSwapManager.class);
+
extensionManager.injectExtensionType(Processor.class,
BiConsumerProcessor.class);
+ extensionManager.injectExtensionType(Processor.class,
GenerateProcessor.class);
+ extensionManager.injectExtensionType(Processor.class,
TerminateOnce.class);
+ extensionManager.injectExtensionType(Processor.class,
TerminateAll.class);
+ extensionManager.injectExtensionType(Processor.class,
NopProcessor.class);
injectExtensionTypes(extensionManager);
systemBundle = SystemBundle.create(nifiProperties);
@@ -171,8 +223,48 @@ public class FrameworkIntegrationTest {
final Authorizer authorizer = new AlwaysAuthorizedAuthorizer();
final AuditService auditService = new NopAuditService();
- flowController =
FlowController.createStandaloneInstance(flowFileEventRepository,
nifiProperties, authorizer, auditService, encryptor, bulletinRepo,
- VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient,
extensionManager);
+ if (isClusteredTest()) {
+ final File zookeeperDir = new File("target/state/zookeeper");
+ final File version2Dir = new File(zookeeperDir, "version-2");
+
+ if (!version2Dir.exists()) {
+ assertTrue(version2Dir.mkdirs());
+ }
+
+ final File[] children = version2Dir.listFiles();
+ if (children != null) {
+ for (final File file : children) {
+ FileUtils.deleteFile(file, true);
+ }
+ }
+
+ clusterCoordinator = Mockito.mock(ClusterCoordinator.class);
+ final HeartbeatMonitor heartbeatMonitor =
Mockito.mock(HeartbeatMonitor.class);
+ final NodeProtocolSender protocolSender =
Mockito.mock(NodeProtocolSender.class);
+ final LeaderElectionManager leaderElectionManager = new
CuratorLeaderElectionManager(2, nifiProperties);
+
+ final NodeIdentifier localNodeId = new
NodeIdentifier(UUID.randomUUID().toString(), "localhost", 8111, "localhost",
8081,
+ "localhost", 8082, "localhost", 8083, 8084, false,
Collections.emptySet());
+ final NodeIdentifier node2Id = new
NodeIdentifier(UUID.randomUUID().toString(), "localhost", 8222, "localhost",
8081,
+ "localhost", 8082, "localhost", 8083, 8084, false,
Collections.emptySet());
+
+ final Set<NodeIdentifier> nodeIdentifiers = new HashSet<>();
+ nodeIdentifiers.add(localNodeId);
+ nodeIdentifiers.add(node2Id);
+
Mockito.when(clusterCoordinator.getNodeIdentifiers()).thenReturn(nodeIdentifiers);
+
Mockito.when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeId);
+
+ flowController =
FlowController.createClusteredInstance(flowFileEventRepository, nifiProperties,
authorizer, auditService, encryptor, protocolSender, bulletinRepo,
clusterCoordinator,
+ heartbeatMonitor, leaderElectionManager,
VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient,
extensionManager);
+
+ flowController.setClustered(true, UUID.randomUUID().toString());
+ flowController.setNodeId(localNodeId);
+
+ flowController.setConnectionStatus(new
NodeConnectionStatus(localNodeId, NodeConnectionState.CONNECTED));
+ } else {
+ flowController =
FlowController.createStandaloneInstance(flowFileEventRepository,
nifiProperties, authorizer, auditService, encryptor, bulletinRepo,
+ VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY,
flowRegistryClient, extensionManager);
+ }
processScheduler = new StandardProcessScheduler(flowEngine,
flowController, encryptor, flowController.getStateManagerProvider(),
nifiProperties);
@@ -180,49 +272,85 @@ public class FrameworkIntegrationTest {
final SchedulingAgent timerDrivenSchedulingAgent = new
TimerDrivenSchedulingAgent(flowController, flowEngine,
repositoryContextFactory, encryptor, nifiProperties);
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN,
timerDrivenSchedulingAgent);
- final ControllerServiceProvider controllerServiceProvider = new
StandardControllerServiceProvider(flowController, processScheduler,
bulletinRepo);
+ flowFileSwapManager = flowController.createSwapManager();
+ resourceClaimManager = flowController.getResourceClaimManager();
+ }
+ protected void createFlow() {
rootProcessGroup =
flowController.getFlowManager().createProcessGroup(UUID.randomUUID().toString());
+ rootProcessGroup.setName("Integration Test");
+
((StandardFlowManager)
flowController.getFlowManager()).setRootGroup(rootProcessGroup);
- nopProcessor = createProcessorNode((context, session) -> {});
+ nopProcessor = createProcessorNode(NopProcessor.class);
+ terminateProcessor = createProcessorNode(TerminateOnce.class);
+ terminateAllProcessor = createProcessorNode(TerminateAll.class);
+ }
- terminateProcessor = createProcessorNode((context, session) -> {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
+ protected boolean isClusteredTest() {
+ return false;
+ }
+
+ protected ClusterCoordinator getClusterCoordinator() {
+ return clusterCoordinator;
+ }
- session.remove(flowFile);
- });
+ @After
+ public final void shutdown() {
+ logger.info("Shutting down...");
- terminateAllProcessor = createProcessorNode((context, session) -> {
- FlowFile flowFile;
- while ((flowFile = session.get()) != null) {
- session.remove(flowFile);
- }
- });
+ if (flowController != null) {
+ flowController.shutdown(true);
+ }
- flowFileSwapManager = flowController.createSwapManager();
- flowFileQueueFactory = new FlowFileQueueFactory() {
- @Override
- public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy
loadBalanceStrategy, final String partitioningAttribute, final
ConnectionEventListener connectionEventListener) {
- return
FrameworkIntegrationTest.this.createFlowFileQueue(UUID.randomUUID().toString());
- }
- };
+ if (flowEngine != null) {
+ flowEngine.shutdownNow();
+ }
- if (queueProvider == null) {
- flowController.initializeFlow();
- } else {
- flowController.initializeFlow(queueProvider);
+ if (processScheduler != null) {
+ processScheduler.shutdown();
}
}
- @After
- public final void shutdown() {
+ protected void restart() throws IOException, ExecutionException,
InterruptedException {
+ logger.info("Shutting down for restart....");
+
+ // Save Flow to a byte array
+ final FlowConfigurationDAO flowDao = new
StandardXMLFlowConfigurationDAO(Paths.get("target/int-tests/flow.xml.gz"),
flowController.getEncryptor(), nifiProperties, getExtensionManager());
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ flowDao.save(flowController, baos);
+ final byte[] flowBytes = baos.toByteArray();
+
+ // Shutdown
flowController.shutdown(true);
flowEngine.shutdownNow();
- processScheduler.shutdown();
+ StandardStateManagerProvider.resetProvider();
+
+ // Remove all Log Repositories so that we can restart with the same
ID's
+ for (final ProcessorNode procNode : rootProcessGroup.getProcessors()) {
+ LogRepositoryFactory.removeRepository(procNode.getIdentifier());
+ }
+
+ // Re-initialize the framework components
+ initialize();
+
+ // Reload the flow
+ final FlowSynchronizer flowSynchronizer = new
StandardFlowSynchronizer(flowController.getEncryptor(), nifiProperties,
extensionManager);
+ flowController.synchronize(flowSynchronizer, new
StandardDataFlow(flowBytes, null, null, Collections.emptySet()));
+
+ // Reload FlowFiles / initialize flow
+ final ProcessGroup newRootGroup =
flowController.getFlowManager().getRootGroup();
+ rootProcessGroup = newRootGroup;
+ final QueueProvider queueProvider = new QueueProvider() {
+ @Override
+ public Collection<FlowFileQueue> getAllQueues() {
+ return newRootGroup.findAllConnections().stream()
+ .map(Connection::getFlowFileQueue)
+ .collect(Collectors.toList());
+ }
+ };
+
+ flowController.initializeFlow(queueProvider);
}
@After
@@ -325,23 +453,26 @@ public class FrameworkIntegrationTest {
return processorNode;
}
- protected final void connect(final ProcessorNode source, final
ProcessorNode destination, final Relationship relationship) {
- connect(source, destination, Collections.singleton(relationship));
+ protected final Connection connect(final ProcessorNode source, final
ProcessorNode destination, final Relationship relationship) {
+ return connect(source, destination,
Collections.singleton(relationship));
}
- protected final void connect(final ProcessorNode source, final
ProcessorNode destination, final Collection<Relationship> relationships) {
+ protected final Connection connect(final ProcessorNode source, final
ProcessorNode destination, final Collection<Relationship> relationships) {
+ final String id = UUID.randomUUID().toString();
final Connection connection = new
StandardConnection.Builder(processScheduler)
.source(source)
.destination(destination)
.relationships(relationships)
- .id(UUID.randomUUID().toString())
+ .id(id)
.clustered(false)
- .flowFileQueueFactory(flowFileQueueFactory)
+ .flowFileQueueFactory((loadBalanceStrategy, partitioningAttribute,
eventListener) -> createFlowFileQueue(id))
.build();
source.addConnection(connection);
destination.addConnection(connection);
rootProcessGroup.addConnection(connection);
+
+ return connection;
}
protected final Future<Void> start(final ProcessorNode procNode) {
@@ -380,7 +511,7 @@ public class FrameworkIntegrationTest {
return getRepositoryContext().getProvenanceRepository();
}
- private RepositoryContext getRepositoryContext() {
+ protected RepositoryContext getRepositoryContext() {
return
flowController.getRepositoryContextFactory().newProcessContext(nopProcessor,
new AtomicLong(0L));
}
@@ -414,31 +545,28 @@ public class FrameworkIntegrationTest {
protected void triggerOnce(final ProcessorNode processor) throws
ExecutionException, InterruptedException {
final String schedulingPeriod = processor.getSchedulingPeriod();
- try {
- final FlowFileEvent initialReport = getStatusReport(processor);
- final int initialInvocations = (initialReport == null) ? 0 :
initialReport.getInvocations();
+ final FlowFileEvent initialReport = getStatusReport(processor);
+ final int initialInvocations = (initialReport == null) ? 0 :
initialReport.getInvocations();
- processor.setScheduldingPeriod("1 hour");
-
- // We will only trigger the Processor to run once per hour. So we
need to ensure that
- // we don't trigger the Processor while it's yielded. So if its
yield expiration is in the future,
- // wait until the yield expires.
- while (processor.getYieldExpiration() >
System.currentTimeMillis()) {
- Thread.sleep(1L);
- }
+ processor.setScheduldingPeriod("1 hour");
- start(processor).get();
+ // We will only trigger the Processor to run once per hour. So we need
to ensure that
+ // we don't trigger the Processor while it's yielded. So if its yield
expiration is in the future,
+ // wait until the yield expires.
+ while (processor.getYieldExpiration() > System.currentTimeMillis()) {
+ Thread.sleep(1L);
+ }
- int totalInvocations = initialInvocations;
- while (totalInvocations < initialInvocations + 1) {
- final FlowFileEvent currentReport = getStatusReport(processor);
- totalInvocations = currentReport == null ? 0 :
currentReport.getInvocations();
- }
+ start(processor).get();
- stop(processor).get();
- } finally {
- processor.setScheduldingPeriod(schedulingPeriod);
+ int totalInvocations = initialInvocations;
+ while (totalInvocations < initialInvocations + 1) {
+ final FlowFileEvent currentReport = getStatusReport(processor);
+ totalInvocations = currentReport == null ? 0 :
currentReport.getInvocations();
}
+
+ stop(processor).get();
+ processor.setScheduldingPeriod(schedulingPeriod);
}
protected FlowFileEvent getStatusReport(final ProcessorNode processor) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/lifecycle/FlowFileRepositoryLifecycleIT.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/lifecycle/FlowFileRepositoryLifecycleIT.java
index 257111b..c1135ad 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/lifecycle/FlowFileRepositoryLifecycleIT.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/lifecycle/FlowFileRepositoryLifecycleIT.java
@@ -75,7 +75,8 @@ public class FlowFileRepositoryLifecycleIT extends
FrameworkIntegrationTest {
shutdown();
final FlowFileQueue restoredQueue =
createFlowFileQueue(queue.getIdentifier());
- initialize(() -> Collections.singleton(restoredQueue));
+ initialize();
+ getFlowController().initializeFlow(() ->
Collections.singleton(restoredQueue));
for (int i=0; i < queueSize; i++) {
final FlowFileRecord flowFileRecord =
restoredQueue.poll(Collections.emptySet());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
index 2fe6029..e55473d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
@@ -22,12 +22,13 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import java.util.Collections;
import java.util.Set;
import java.util.function.BiConsumer;
public class BiConsumerProcessor extends AbstractProcessor {
private BiConsumer<ProcessContext, ProcessSession> trigger;
- private Set<Relationship> relationships;
+ private volatile Set<Relationship> relationships;
public void setTrigger(final BiConsumer<ProcessContext, ProcessSession>
trigger) {
this.trigger = trigger;
@@ -39,8 +40,13 @@ public class BiConsumerProcessor extends AbstractProcessor {
@Override
public Set<Relationship> getRelationships() {
- if (relationships == null) {
- throw new IllegalStateException("Relationships have not been
initialized");
+ while (relationships == null) {
+ try {
+ Thread.sleep(1L);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return Collections.emptySet();
+ }
}
return relationships;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/GenerateProcessor.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/GenerateProcessor.java
new file mode 100644
index 0000000..1a67871
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/GenerateProcessor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.integration.processors;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static
org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
+import static
org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR;
+
+public class GenerateProcessor extends AbstractProcessor {
+ public static final PropertyDescriptor COUNT = new Builder()
+ .name("Count")
+ .displayName("Count")
+ .description("Number of FlowFiles to generate")
+ .required(true)
+ .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("1")
+ .build();
+
+ public static final PropertyDescriptor CONTENT_SIZE = new Builder()
+ .name("Content Size")
+ .displayName("Content Size")
+ .description("Size of the FlowFile")
+ .required(true)
+ .addValidator(DATA_SIZE_VALIDATOR)
+ .defaultValue("0 B")
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Arrays.asList(COUNT, CONTENT_SIZE);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ for (int i=0; i < context.getProperty(COUNT).asInteger(); i++) {
+ FlowFile flowFile = session.create();
+
+ final int size =
context.getProperty(CONTENT_SIZE).asDataSize(DataUnit.B).intValue();
+ final byte[] data = new byte[size];
+ session.write(flowFile, out -> out.write(data));
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/NopProcessor.java
similarity index 53%
copy from
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
copy to
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/NopProcessor.java
index 2fe6029..94265c4 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/NopProcessor.java
@@ -14,44 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.integration.processor;
+package org.apache.nifi.integration.processors;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import java.util.Set;
-import java.util.function.BiConsumer;
-
-public class BiConsumerProcessor extends AbstractProcessor {
- private BiConsumer<ProcessContext, ProcessSession> trigger;
- private Set<Relationship> relationships;
-
- public void setTrigger(final BiConsumer<ProcessContext, ProcessSession>
trigger) {
- this.trigger = trigger;
- }
-
- public void setRelationships(final Set<Relationship> relationships) {
- this.relationships = relationships;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- if (relationships == null) {
- throw new IllegalStateException("Relationships have not been
initialized");
- }
-
- return relationships;
- }
-
+public class NopProcessor extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- if (trigger == null) {
- throw new IllegalStateException("Trigger has not been
initialized");
- }
-
- trigger.accept(context, session);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/TerminateAll.java
similarity index 54%
copy from
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
copy to
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/TerminateAll.java
index 2fe6029..4295f28 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/TerminateAll.java
@@ -14,44 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.integration.processor;
+package org.apache.nifi.integration.processors;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import java.util.Set;
-import java.util.function.BiConsumer;
-
-public class BiConsumerProcessor extends AbstractProcessor {
- private BiConsumer<ProcessContext, ProcessSession> trigger;
- private Set<Relationship> relationships;
-
- public void setTrigger(final BiConsumer<ProcessContext, ProcessSession>
trigger) {
- this.trigger = trigger;
- }
-
- public void setRelationships(final Set<Relationship> relationships) {
- this.relationships = relationships;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- if (relationships == null) {
- throw new IllegalStateException("Relationships have not been
initialized");
- }
-
- return relationships;
- }
-
+public class TerminateAll extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- if (trigger == null) {
- throw new IllegalStateException("Trigger has not been
initialized");
+ FlowFile flowFile;
+ while ((flowFile = session.get()) != null) {
+ session.remove(flowFile);
+ session.adjustCounter("Removed", 1, false);
}
-
- trigger.accept(context, session);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/TerminateOnce.java
similarity index 54%
copy from
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
copy to
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/TerminateOnce.java
index 2fe6029..14ed476 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processor/BiConsumerProcessor.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processors/TerminateOnce.java
@@ -14,44 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.integration.processor;
+package org.apache.nifi.integration.processors;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import java.util.Set;
-import java.util.function.BiConsumer;
-
-public class BiConsumerProcessor extends AbstractProcessor {
- private BiConsumer<ProcessContext, ProcessSession> trigger;
- private Set<Relationship> relationships;
-
- public void setTrigger(final BiConsumer<ProcessContext, ProcessSession>
trigger) {
- this.trigger = trigger;
- }
-
- public void setRelationships(final Set<Relationship> relationships) {
- this.relationships = relationships;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- if (relationships == null) {
- throw new IllegalStateException("Relationships have not been
initialized");
- }
-
- return relationships;
- }
-
+public class TerminateOnce extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- if (trigger == null) {
- throw new IllegalStateException("Trigger has not been
initialized");
+ FlowFile flowFile = session.get();
+ if (flowFile != null) {
+ session.remove(flowFile);
}
-
- trigger.accept(context, session);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/ClusteredSwapFileIT.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/ClusteredSwapFileIT.java
new file mode 100644
index 0000000..3f6027a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/ClusteredSwapFileIT.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.integration.swap;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.queue.ConnectionEventListener;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.QueueDiagnostics;
+import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
+import
org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue;
+import
org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.integration.FrameworkIntegrationTest;
+import org.apache.nifi.integration.processors.GenerateProcessor;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+public class ClusteredSwapFileIT extends FrameworkIntegrationTest {
+
+ @Test
+ public void
testSwapOnRestartWithLoadBalancedConnectionDoNotLoadBalanceStrategy() throws
ExecutionException, InterruptedException, IOException {
+ final ProcessorNode generator =
createProcessorNode(GenerateProcessor.class);
+
generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(),
"60000"));
+
+ Connection connection = connect(generator, getTerminateAllProcessor(),
REL_SUCCESS);
+ triggerOnce(generator);
+
+ FlowFileQueue queue = connection.getFlowFileQueue();
+ QueueDiagnostics diagnostics = queue.getQueueDiagnostics();
+ LocalQueuePartitionDiagnostics localPartitionDiagnostics =
diagnostics.getLocalQueuePartitionDiagnostics();
+
+ assertEquals(20_000,
localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
+ assertEquals(4, localPartitionDiagnostics.getSwapFileCount());
+ assertEquals(40_000,
localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
+ assertEquals(60_000, queue.size().getObjectCount());
+
+ // restart nifi
+ restart();
+
+ // get the new Connection with the same ID
+ connection = getRootGroup().getConnection(connection.getIdentifier());
+ queue = connection.getFlowFileQueue();
+ diagnostics = queue.getQueueDiagnostics();
+ localPartitionDiagnostics =
diagnostics.getLocalQueuePartitionDiagnostics();
+
+ // Ensure we have the correct queue sizes
+ assertEquals(20_000,
localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
+ assertEquals(4, localPartitionDiagnostics.getSwapFileCount());
+ assertEquals(40_000,
localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
+ assertEquals(60_000, queue.size().getObjectCount());
+
+ // Consume all the data
+ for (int i=0; i < 60_000; i++) {
+ final FlowFileRecord flowFile = queue.poll(Collections.emptySet());
+ assertNotNull(flowFile);
+
+ queue.acknowledge(flowFile);
+ }
+
+ assertNull(queue.poll(Collections.emptySet()));
+
+ // Check queue sizes again
+ diagnostics = queue.getQueueDiagnostics();
+ localPartitionDiagnostics =
diagnostics.getLocalQueuePartitionDiagnostics();
+
+ assertEquals(0,
localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
+ assertEquals(0, localPartitionDiagnostics.getSwapFileCount());
+ assertEquals(0,
localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
+ assertEquals(0, queue.size().getObjectCount());
+ }
+
+
+ @Test
+ public void
testSwapOnRestartWithLoadBalancedConnectionRoundRobinStrategy() throws
ExecutionException, InterruptedException, IOException {
+ final ProcessorNode generator =
createProcessorNode(GenerateProcessor.class);
+
generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(),
"60000"));
+
+ Connection connection = connect(generator, getTerminateAllProcessor(),
REL_SUCCESS);
+ FlowFileQueue queue = connection.getFlowFileQueue();
+
+ queue.setLoadBalanceStrategy(LoadBalanceStrategy.ROUND_ROBIN, null);
+
+ final Set<NodeIdentifier> nodeIdentifiers =
getClusterCoordinator().getNodeIdentifiers();
+ ((SocketLoadBalancedFlowFileQueue)
queue).setNodeIdentifiers(nodeIdentifiers, false);
+
+ triggerOnce(generator);
+
+ QueueDiagnostics diagnostics = queue.getQueueDiagnostics();
+ LocalQueuePartitionDiagnostics localPartitionDiagnostics =
diagnostics.getLocalQueuePartitionDiagnostics();
+ RemoteQueuePartitionDiagnostics remotePartitionDiagnostics =
diagnostics.getRemoteQueuePartitionDiagnostics().get(0);
+
+ assertEquals(20_000,
localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
+ assertEquals(1, localPartitionDiagnostics.getSwapFileCount());
+ assertEquals(10_000,
localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
+
+ assertEquals(2, remotePartitionDiagnostics.getSwapFileCount());
+ assertEquals(29_000,
remotePartitionDiagnostics.getSwapQueueSize().getObjectCount());
+ assertEquals(1_000,
remotePartitionDiagnostics.getActiveQueueSize().getObjectCount());
+
+ assertEquals(60_000, queue.size().getObjectCount());
+
+ // restart nifi
+ restart();
+
+ // get the new Connection with the same ID
+ connection = getRootGroup().getConnection(connection.getIdentifier());
+ queue = connection.getFlowFileQueue();
+
+ // Ensure we have the correct queue sizes
+ assertEquals(60_000, queue.size().getObjectCount());
+
+ while (true) {
+ triggerOnce((ProcessorNode) connection.getDestination());
+ FlowFileRecord polled = queue.poll(Collections.emptySet());
+ if (polled == null) {
+ break;
+ }
+
+ queue.acknowledge(polled);
+ }
+
+ // Check queue sizes again
+ diagnostics = queue.getQueueDiagnostics();
+ localPartitionDiagnostics =
diagnostics.getLocalQueuePartitionDiagnostics();
+ remotePartitionDiagnostics =
diagnostics.getRemoteQueuePartitionDiagnostics().get(0);
+
+ assertEquals(0,
localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
+ assertEquals(0, localPartitionDiagnostics.getSwapFileCount());
+ assertEquals(0,
localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
+
+ final int queueCount =
remotePartitionDiagnostics.getActiveQueueSize().getObjectCount() +
remotePartitionDiagnostics.getSwapQueueSize().getObjectCount()
+ +
remotePartitionDiagnostics.getUnacknowledgedQueueSize().getObjectCount();
+
+ assertEquals(queueCount, queue.size().getObjectCount());
+ }
+
+
+ @Test(timeout = 60_000)
+ public void testChangeLoadBalanceStrategyWhileDataSwapped() throws
ExecutionException, InterruptedException, IOException {
+ final ProcessorNode generator =
createProcessorNode(GenerateProcessor.class);
+
generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(),
"60000"));
+
+ Connection connection = connect(generator, getTerminateAllProcessor(),
REL_SUCCESS);
+ triggerOnce(generator);
+
+ FlowFileQueue queue = connection.getFlowFileQueue();
+ QueueDiagnostics diagnostics = queue.getQueueDiagnostics();
+ LocalQueuePartitionDiagnostics localPartitionDiagnostics =
diagnostics.getLocalQueuePartitionDiagnostics();
+
+ assertEquals(20_000,
localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
+ assertEquals(4, localPartitionDiagnostics.getSwapFileCount());
+ assertEquals(40_000,
localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
+ assertEquals(60_000, queue.size().getObjectCount());
+
+ queue.setLoadBalanceStrategy(LoadBalanceStrategy.ROUND_ROBIN, null);
+
+ // Consume all the data
+ int polled = 0;
+ while (polled < 30_000) {
+ final FlowFileRecord flowFile = queue.poll(Collections.emptySet());
+ if (flowFile != null) {
+ polled++;
+ queue.acknowledge(flowFile);
+ }
+ }
+
+ assertNull(queue.poll(Collections.emptySet()));
+
+ // Check queue sizes again
+ diagnostics = queue.getQueueDiagnostics();
+ localPartitionDiagnostics =
diagnostics.getLocalQueuePartitionDiagnostics();
+
+ assertEquals(0,
localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
+ assertEquals(0, localPartitionDiagnostics.getSwapFileCount());
+ assertEquals(0,
localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
+ assertEquals(30_000, queue.size().getObjectCount());
+
+ queue.setLoadBalanceStrategy(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE,
null);
+
+ // Consume all the data
+ polled = 0;
+ while (polled < 30_000) {
+ final FlowFileRecord flowFile = queue.poll(Collections.emptySet());
+ if (flowFile != null) {
+ polled++;
+ queue.acknowledge(flowFile);
+ }
+ }
+
+ assertNull(queue.poll(Collections.emptySet()));
+
+ // Check queue sizes again
+ diagnostics = queue.getQueueDiagnostics();
+ localPartitionDiagnostics =
diagnostics.getLocalQueuePartitionDiagnostics();
+
+ assertEquals(0,
localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
+ assertEquals(0, localPartitionDiagnostics.getSwapFileCount());
+ assertEquals(0,
localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
+ assertEquals(0, queue.size().getObjectCount());
+ }
+
+
+ protected FlowFileQueue createFlowFileQueue(final String uuid) {
+ final ProcessScheduler processScheduler =
getFlowController().getProcessScheduler();
+ final ResourceClaimManager resourceClaimManager =
getFlowController().getResourceClaimManager();
+ final FlowFileSwapManager swapManager =
getFlowController().createSwapManager();
+
+ final AsyncLoadBalanceClientRegistry clientRegistry =
Mockito.mock(AsyncLoadBalanceClientRegistry.class);
+
+ return new SocketLoadBalancedFlowFileQueue(uuid,
ConnectionEventListener.NOP_EVENT_LISTENER, processScheduler,
getFlowFileRepository(), getProvenanceRepository(),
+ getContentRepository(), resourceClaimManager,
getClusterCoordinator(), clientRegistry, swapManager, 20000,
EventReporter.NO_OP);
+ }
+
+ @Override
+ protected boolean isClusteredTest() {
+ return true;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java
new file mode 100644
index 0000000..c93bf7c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.integration.swap;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.QueueDiagnostics;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.integration.FrameworkIntegrationTest;
+import org.apache.nifi.integration.processors.GenerateProcessor;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+public class StandaloneSwapFileIT extends FrameworkIntegrationTest {
+ @Test
+ public void testSwapOnRestart() throws ExecutionException,
InterruptedException, IOException {
+ Thread.sleep(20000L);
+
+ final ProcessorNode generator =
createProcessorNode(GenerateProcessor.class);
+
generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(),
"60000"));
+
+ Connection connection = connect(generator, getTerminateAllProcessor(),
REL_SUCCESS);
+ triggerOnce(generator);
+
+ FlowFileQueue queue = connection.getFlowFileQueue();
+ QueueDiagnostics diagnostics = queue.getQueueDiagnostics();
+ LocalQueuePartitionDiagnostics localPartitionDiagnostics =
diagnostics.getLocalQueuePartitionDiagnostics();
+
+ assertEquals(20_000,
localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
+ assertEquals(4, localPartitionDiagnostics.getSwapFileCount());
+ assertEquals(40_000,
localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
+ assertEquals(60_000, queue.size().getObjectCount());
+
+ // restart nifi
+ restart();
+
+ // get the new Connection with the same ID
+ connection = getRootGroup().getConnection(connection.getIdentifier());
+ queue = connection.getFlowFileQueue();
+ diagnostics = queue.getQueueDiagnostics();
+ localPartitionDiagnostics =
diagnostics.getLocalQueuePartitionDiagnostics();
+
+ // Ensure we have the correct queue sizes
+ assertEquals(20_000,
localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
+ assertEquals(4, localPartitionDiagnostics.getSwapFileCount());
+ assertEquals(40_000,
localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
+ assertEquals(60_000, queue.size().getObjectCount());
+
+ // Consume all the data
+ for (int i=0; i < 60_000; i++) {
+ final FlowFileRecord flowFile = queue.poll(Collections.emptySet());
+ assertNotNull(flowFile);
+
+ queue.acknowledge(flowFile);
+ }
+
+ assertNull(queue.poll(Collections.emptySet()));
+
+ // Check queue sizes again
+ diagnostics = queue.getQueueDiagnostics();
+ localPartitionDiagnostics =
diagnostics.getLocalQueuePartitionDiagnostics();
+
+ assertEquals(0,
localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
+ assertEquals(0, localPartitionDiagnostics.getSwapFileCount());
+ assertEquals(0,
localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
+ assertEquals(0, queue.size().getObjectCount());
+ }
+
+
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
new file mode 100644
index 0000000..2516b42
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
@@ -0,0 +1,256 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Core Properties #
+nifi.flow.configuration.file=./target/conf/flow.xml.gz
+nifi.flow.configuration.archive.enabled=true
+nifi.flow.configuration.archive.dir=./target/conf/archive/
+nifi.flow.configuration.archive.max.time=30 days
+nifi.flow.configuration.archive.max.storage=500 MB
+nifi.flow.configuration.archive.max.count=
+nifi.flowcontroller.autoResumeState=true
+nifi.flowcontroller.graceful.shutdown.period=10 sec
+nifi.flowservice.writedelay.interval=500 ms
+nifi.administrative.yield.duration=100 millis
+# If a component has no work to do (is "bored"), how long should we wait
before checking again for work?
+nifi.bored.yield.duration=10 millis
+nifi.queue.backpressure.count=10000
+nifi.queue.backpressure.size=1 GB
+
+nifi.authorizer.configuration.file=./target/conf/authorizers.xml
+nifi.login.identity.provider.configuration.file=./target/conf/login-identity-providers.xml
+nifi.templates.directory=./target/conf/templates
+nifi.ui.banner.text=
+nifi.ui.autorefresh.interval=30 sec
+nifi.nar.library.directory=./target/lib
+nifi.nar.library.autoload.directory=./target/extensions
+nifi.nar.working.directory=./target/work/nar/
+nifi.documentation.working.directory=./target/work/docs/components
+
+####################
+# State Management #
+####################
+nifi.state.management.configuration.file=src/test/resources/int-tests/state-management.xml
+# The ID of the local state provider
+nifi.state.management.provider.local=local-provider
+# The ID of the cluster-wide state provider. This will be ignored if NiFi is
not clustered but must be populated if running in a cluster.
+nifi.state.management.provider.cluster=zk-provider
+# Specifies whether or not this instance of NiFi should run an embedded
ZooKeeper server
+nifi.state.management.embedded.zookeeper.start=true
+# Properties file that provides the ZooKeeper properties to use if
<nifi.state.management.embedded.zookeeper.start> is set to true
+nifi.state.management.embedded.zookeeper.properties=src/test/resources/int-tests/zookeeper.properties
+
+
+# H2 Settings
+nifi.database.directory=./target/database_repository
+nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
+
+# FlowFile Repository
+nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository
+nifi.flowfile.repository.wal.implementation=org.apache.nifi.wali.SequentialAccessWriteAheadLog
+nifi.flowfile.repository.directory=./target/int-tests/flowfile_repository
+nifi.flowfile.repository.partitions=256
+nifi.flowfile.repository.checkpoint.interval=5 mins
+nifi.flowfile.repository.always.sync=false
+
+nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager
+nifi.queue.swap.threshold=20000
+nifi.swap.in.period=5 sec
+nifi.swap.in.threads=1
+nifi.swap.out.period=5 sec
+nifi.swap.out.threads=4
+
+# Content Repository
+nifi.content.repository.implementation=org.apache.nifi.controller.repository.FileSystemRepository
+nifi.content.claim.max.appendable.size=1 MB
+nifi.content.claim.max.flow.files=100
+nifi.content.repository.directory.default=./target/int-tests/content_repository
+nifi.content.repository.archive.max.retention.period=12 hours
+nifi.content.repository.archive.max.usage.percentage=50%
+nifi.content.repository.archive.enabled=true
+nifi.content.repository.always.sync=false
+nifi.content.viewer.url=../nifi-content-viewer/
+
+# Provenance Repository Properties
+nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
+nifi.provenance.repository.debug.frequency=1_000_000
+nifi.provenance.repository.encryption.key.provider.implementation=
+nifi.provenance.repository.encryption.key.provider.location=
+nifi.provenance.repository.encryption.key.id=
+nifi.provenance.repository.encryption.key=
+
+# Persistent Provenance Repository Properties
+nifi.provenance.repository.directory.default=./target/int-tests/provenance_repository
+nifi.provenance.repository.max.storage.time=24 hours
+nifi.provenance.repository.max.storage.size=1 GB
+nifi.provenance.repository.rollover.time=30 secs
+nifi.provenance.repository.rollover.size=100 MB
+nifi.provenance.repository.query.threads=2
+nifi.provenance.repository.index.threads=2
+nifi.provenance.repository.compress.on.rollover=true
+nifi.provenance.repository.always.sync=false
+# Comma-separated list of fields. Fields that are not indexed will not be
searchable. Valid fields are:
+# EventType, FlowFileUUID, Filename, TransitURI, ProcessorID,
AlternateIdentifierURI, Relationship, Details
+nifi.provenance.repository.indexed.fields=EventType, FlowFileUUID, Filename,
ProcessorID, Relationship
+# FlowFile Attributes that should be indexed and made searchable. Some
examples to consider are filename, uuid, mime.type
+nifi.provenance.repository.indexed.attributes=
+# Large values for the shard size will result in more Java heap usage when
searching the Provenance Repository
+# but should provide better performance
+nifi.provenance.repository.index.shard.size=500 MB
+# Indicates the maximum length that a FlowFile attribute can be when
retrieving a Provenance Event from
+# the repository. If the length of any attribute exceeds this value, it will
be truncated when the event is retrieved.
+nifi.provenance.repository.max.attribute.length=65536
+nifi.provenance.repository.concurrent.merge.threads=2
+
+
+# Volatile Provenance Respository Properties
+nifi.provenance.repository.buffer.size=100000
+
+# Component Status Repository
+nifi.components.status.repository.implementation=org.apache.nifi.controller.status.history.VolatileComponentStatusRepository
+nifi.components.status.repository.buffer.size=1440
+nifi.components.status.snapshot.frequency=1 min
+
+# Site to Site properties
+nifi.remote.input.host=
+nifi.remote.input.secure=false
+nifi.remote.input.socket.port=
+nifi.remote.input.http.enabled=true
+nifi.remote.input.http.transaction.ttl=30 sec
+nifi.remote.contents.cache.expiration=30 secs
+
+# web properties #
+nifi.web.war.directory=./target/lib
+nifi.web.http.host=
+nifi.web.http.port=8080
+nifi.web.http.network.interface.default=
+nifi.web.https.host=
+nifi.web.https.port=
+nifi.web.https.network.interface.default=
+nifi.web.jetty.working.directory=./target/work/jetty
+nifi.web.jetty.threads=200
+nifi.web.max.header.size=16 KB
+nifi.web.proxy.context.path=
+nifi.web.proxy.host=
+
+# security properties #
+nifi.sensitive.props.key=
+nifi.sensitive.props.key.protected=
+nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
+nifi.sensitive.props.provider=BC
+nifi.sensitive.props.additional.keys=
+
+nifi.security.keystore=
+nifi.security.keystoreType=
+nifi.security.keystorePasswd=
+nifi.security.keyPasswd=
+nifi.security.truststore=
+nifi.security.truststoreType=
+nifi.security.truststorePasswd=
+nifi.security.user.authorizer=managed-authorizer
+nifi.security.user.login.identity.provider=
+nifi.security.ocsp.responder.url=
+nifi.security.ocsp.responder.certificate=
+
+# OpenId Connect SSO Properties #
+nifi.security.user.oidc.discovery.url=
+nifi.security.user.oidc.connect.timeout=5 secs
+nifi.security.user.oidc.read.timeout=5 secs
+nifi.security.user.oidc.client.id=
+nifi.security.user.oidc.client.secret=
+nifi.security.user.oidc.preferred.jwsalgorithm=
+
+# Apache Knox SSO Properties #
+nifi.security.user.knox.url=
+nifi.security.user.knox.publicKey=
+nifi.security.user.knox.cookieName=hadoop-jwt
+nifi.security.user.knox.audiences=
+
+# Identity Mapping Properties #
+# These properties allow normalizing user identities such that identities
coming from different identity providers
+# (certificates, LDAP, Kerberos) can be treated the same internally in NiFi.
The following example demonstrates normalizing
+# DNs from certificates and principals from Kerberos into a common identity
string:
+#
+# nifi.security.identity.mapping.pattern.dn=^CN=(.*?), OU=(.*?), O=(.*?),
L=(.*?), ST=(.*?), C=(.*?)$
+# nifi.security.identity.mapping.value.dn=$1@$2
+# nifi.security.identity.mapping.transform.dn=NONE
+# nifi.security.identity.mapping.pattern.kerb=^(.*?)/instance@(.*?)$
+# nifi.security.identity.mapping.value.kerb=$1@$2
+# nifi.security.identity.mapping.transform.kerb=UPPER
+
+# Group Mapping Properties #
+# These properties allow normalizing group names coming from external sources
like LDAP. The following example
+# lowercases any group name.
+#
+# nifi.security.group.mapping.pattern.anygroup=^(.*)$
+# nifi.security.group.mapping.value.anygroup=$1
+# nifi.security.group.mapping.transform.anygroup=LOWER
+
+# cluster common properties (all nodes must have same values) #
+nifi.cluster.protocol.heartbeat.interval=5 sec
+nifi.cluster.protocol.is.secure=false
+
+# cluster node properties (only configure for cluster nodes) #
+nifi.cluster.is.node=true
+nifi.cluster.node.address=localhost
+nifi.cluster.node.protocol.port=0
+nifi.cluster.node.protocol.threads=10
+nifi.cluster.node.protocol.max.threads=50
+nifi.cluster.node.event.history.size=25
+nifi.cluster.node.connection.timeout=5 sec
+nifi.cluster.node.read.timeout=5 sec
+nifi.cluster.node.max.concurrent.requests=100
+nifi.cluster.firewall.file=
+nifi.cluster.flow.election.max.wait.time=5 mins
+nifi.cluster.flow.election.max.candidates=
+
+# cluster load balancing properties #
+nifi.cluster.load.balance.host=
+nifi.cluster.load.balance.port=6342
+nifi.cluster.load.balance.connections.per.node=4
+nifi.cluster.load.balance.max.thread.count=8
+nifi.cluster.load.balance.comms.timeout=30 sec
+
+# zookeeper properties, used for cluster management #
+nifi.zookeeper.connect.string=localhost:62181
+nifi.zookeeper.connect.timeout=3 secs
+nifi.zookeeper.session.timeout=3 secs
+nifi.zookeeper.root.node=/nifi-integration-tests
+
+# Zookeeper properties for the authentication scheme used when creating acls
on znodes used for cluster management
+# Values supported for nifi.zookeeper.auth.type are "default", which will
apply world/anyone rights on znodes
+# and "sasl" which will give rights to the sasl/kerberos identity used to
authenticate the nifi node
+# The identity is determined using the value in
nifi.kerberos.service.principal and the removeHostFromPrincipal
+# and removeRealmFromPrincipal values (which should align with the
kerberos.removeHostFromPrincipal and kerberos.removeRealmFromPrincipal
+# values configured on the zookeeper server).
+nifi.zookeeper.auth.type=
+nifi.zookeeper.kerberos.removeHostFromPrincipal=
+nifi.zookeeper.kerberos.removeRealmFromPrincipal=
+
+# kerberos #
+nifi.kerberos.krb5.file=
+
+# kerberos service principal #
+nifi.kerberos.service.principal=
+nifi.kerberos.service.keytab.location=
+
+# kerberos spnego principal #
+nifi.kerberos.spnego.principal=
+nifi.kerberos.spnego.keytab.location=
+nifi.kerberos.spnego.authentication.expiration=12 hours
+
+# external properties files for variable registry
+# supports a comma delimited list of file locations
+nifi.variable.registry.properties=
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/state-management.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/state-management.xml
index f6b9768..159f518 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/state-management.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/state-management.xml
@@ -21,4 +21,12 @@
<property name="Partitions">16</property>
<property name="Checkpoint Interval">2 mins</property>
</local-provider>
+ <cluster-provider>
+ <id>zk-provider</id>
+
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
+ <property name="Connect String">localhost:62181</property>
+ <property name="Root Node">/nifi-integration-test</property>
+ <property name="Session Timeout">30 seconds</property>
+ <property name="Access Control">Open</property>
+ </cluster-provider>
</stateManagement>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/zookeeper.properties
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/zookeeper.properties
new file mode 100644
index 0000000..47b9290
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/zookeeper.properties
@@ -0,0 +1,45 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+#
+
+clientPort=62181
+initLimit=10
+autopurge.purgeInterval=24
+syncLimit=5
+tickTime=2000
+dataDir=target/state/zookeeper
+autopurge.snapRetainCount=30
+
+#
+# Specifies the servers that are part of this zookeeper ensemble. For
+# every NiFi instance running an embedded zookeeper, there needs to be
+# a server entry below. For instance:
+#
+# server.1=nifi-node1-hostname:2888:3888
+# server.2=nifi-node2-hostname:2888:3888
+# server.3=nifi-node3-hostname:2888:3888
+#
+# The index of the server corresponds to the myid file that gets created
+# in the dataDir of each node running an embedded zookeeper. See the
+# administration guide for more details.
+#
+
+server.1=localhost:5777:6777