This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 83ac191 NIFI-5997: If we swap out data, ensure that we do not
increment the size of the queue by the size of the data that we failed to swap
out. Also, if the FlowFile Repo does not know about a given swap file, do not
restore it on restart
83ac191 is described below
commit 83ac191736e8036f82da467ceb1940b50d9886f0
Author: Mark Payne <[email protected]>
AuthorDate: Fri Feb 1 13:10:51 2019 -0500
NIFI-5997: If we swap out data, ensure that we do not increment the size of
the queue by the size of the data that we failed to swap out. Also, if the
FlowFile Repo does not know about a given swap file, do not restore it on
restart
This closes #3290.
Signed-off-by: Bryan Bende <[email protected]>
---
.../exception/FlowFileAccessException.java | 2 +-
.../java/org/apache/nifi/wali/HashMapSnapshot.java | 20 ++--
.../nifi/wali/SequentialAccessWriteAheadLog.java | 20 +++-
.../org/apache/nifi/wali/WriteAheadSnapshot.java | 3 +
.../main/java/org/wali/WriteAheadRepository.java | 2 +-
.../controller/repository/FlowFileRepository.java | 26 ++++-
.../nifi-framework/nifi-framework-core/pom.xml | 1 +
.../nifi/controller/FileSystemSwapManager.java | 20 +++-
.../controller/queue/SwappablePriorityQueue.java | 8 +-
.../repository/StandardProcessSession.java | 30 +++++-
.../repository/VolatileFlowFileRepository.java | 14 ++-
.../repository/WriteAheadFlowFileRepository.java | 88 +++++++++++++---
.../apache/nifi/controller/MockSwapManager.java | 38 +++++--
.../nifi/controller/TestFileSystemSwapManager.java | 114 ++++++++++++++++++---
.../clustered/TestSwappablePriorityQueue.java | 59 ++++++++---
.../repository/TestStandardProcessSession.java | 113 +++++++++++++-------
.../TestWriteAheadFlowFileRepository.java | 96 +++++++++++++++++
.../src/test/resources/swap/444-old-swap-file.swap | Bin 0 -> 1730054 bytes
18 files changed, 536 insertions(+), 118 deletions(-)
diff --git
a/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java
b/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java
index c7e9c22..c64ea1c 100644
---
a/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java
+++
b/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java
@@ -18,7 +18,7 @@ package org.apache.nifi.processor.exception;
/**
* Indicates an issue occurred while accessing the content of a FlowFile, such
- * as an IOException.
+ * as an IOException, or obtaining a reference to the FlowFile
*
*/
public class FlowFileAccessException extends RuntimeException {
diff --git
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
index 0dad62c..002ecd2 100644
---
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
+++
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
@@ -17,6 +17,12 @@
package org.apache.nifi.wali;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.SerDeFactory;
+import org.wali.UpdateType;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -37,12 +43,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.SerDe;
-import org.wali.SerDeFactory;
-import org.wali.UpdateType;
-
public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>,
RecordLookup<T> {
private static final Logger logger =
LoggerFactory.getLogger(HashMapSnapshot.class);
private static final int ENCODING_VERSION = 1;
@@ -216,10 +216,14 @@ public class HashMapSnapshot<T> implements
WriteAheadSnapshot<T>, RecordLookup<T
return recordMap.get(recordId);
}
-
@Override
public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId) {
- return new Snapshot(new HashMap<>(recordMap), new
HashSet<>(swapLocations), maxTransactionId);
+ return prepareSnapshot(maxTransactionId, this.swapLocations);
+ }
+
+ @Override
+ public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId,
final Set<String> swapFileLocations) {
+ return new Snapshot(new HashMap<>(recordMap), new
HashSet<>(swapFileLocations), maxTransactionId);
}
private int getVersion() {
diff --git
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
index 11eb31c..240a212 100644
---
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
+++
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -65,6 +66,7 @@ public class SequentialAccessWriteAheadLog<T> implements
WriteAheadRepository<T>
private final File journalsDirectory;
private final SerDeFactory<T> serdeFactory;
private final SyncListener syncListener;
+ private final Set<String> recoveredSwapLocations = new HashSet<>();
private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock();
private final Lock journalReadLock = journalRWLock.readLock();
@@ -144,6 +146,7 @@ public class SequentialAccessWriteAheadLog<T> implements
WriteAheadRepository<T>
final long recoverStart = System.nanoTime();
recovered = true;
snapshotRecovery = snapshot.recover();
+
this.recoveredSwapLocations.addAll(snapshotRecovery.getRecoveredSwapLocations());
final long snapshotRecoveryMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart);
@@ -212,7 +215,9 @@ public class SequentialAccessWriteAheadLog<T> implements
WriteAheadRepository<T>
final long recoveryMillis =
TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS);
logger.info("Successfully recovered {} records in {} milliseconds. Now
checkpointing to ensure that Write-Ahead Log is in a consistent state",
recoveredRecords.size(), recoveryMillis);
- checkpoint();
+ this.recoveredSwapLocations.addAll(swapLocations);
+
+ checkpoint(this.recoveredSwapLocations);
return recoveredRecords.values();
}
@@ -238,11 +243,15 @@ public class SequentialAccessWriteAheadLog<T> implements
WriteAheadRepository<T>
throw new IllegalStateException("Cannot retrieve the Recovered
Swap Locations until record recovery has been performed");
}
- return snapshotRecovery.getRecoveredSwapLocations();
+ return Collections.unmodifiableSet(this.recoveredSwapLocations);
}
@Override
public int checkpoint() throws IOException {
+ return checkpoint(null);
+ }
+
+ private int checkpoint(final Set<String> swapLocations) throws IOException
{
final SnapshotCapture<T> snapshotCapture;
final long startNanos = System.nanoTime();
@@ -276,7 +285,12 @@ public class SequentialAccessWriteAheadLog<T> implements
WriteAheadRepository<T>
final File[] existingFiles =
journalsDirectory.listFiles(this::isJournalFile);
existingJournals = (existingFiles == null) ? new File[0] :
existingFiles;
- snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1);
+ if (swapLocations == null) {
+ snapshotCapture = snapshot.prepareSnapshot(nextTransactionId -
1);
+ } else {
+ snapshotCapture = snapshot.prepareSnapshot(nextTransactionId -
1, swapLocations);
+ }
+
// Create a new journal. We name the journal file <next
transaction id>.journal but it is possible
// that we could have an empty journal file already created. If
this happens, we don't want to create
diff --git
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
index a4cbcd2..fd7cfd8 100644
---
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
+++
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
@@ -19,10 +19,13 @@ package org.apache.nifi.wali;
import java.io.IOException;
import java.util.Collection;
+import java.util.Set;
public interface WriteAheadSnapshot<T> {
SnapshotCapture<T> prepareSnapshot(long maxTransactionId);
+ SnapshotCapture<T> prepareSnapshot(long maxTransactionId, Set<String>
swapLocations);
+
void writeSnapshot(SnapshotCapture<T> snapshot) throws IOException;
SnapshotRecovery<T> recover() throws IOException;
diff --git
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
index 05fc8a5..b7f18ab 100644
---
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
+++
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
@@ -65,7 +65,7 @@ public interface WriteAheadRepository<T> {
* if power is lost or the Operating System crashes
* @throws IOException if failure to update repo
* @throws IllegalArgumentException if multiple records within the given
- * Collection have the same ID, as specified by {@link Record#getId()}
+ * Collection have the same ID, as specified by {@link
SerDe#getRecordIdentifier(Object)}
* method
*
* @return the index of the Partition that performed the update
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 6560c0a..b9ff249 100644
---
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -16,14 +16,14 @@
*/
package org.apache.nifi.controller.repository;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-
/**
* Implementations must be thread safe
*
@@ -128,4 +128,24 @@ public interface FlowFileRepository extends Closeable {
* @throws IOException if swap fails
*/
void swapFlowFilesIn(String swapLocation, List<FlowFileRecord>
flowFileRecords, FlowFileQueue flowFileQueue) throws IOException;
+
+ /**
+ * <p>
+ * Determines whether or not the given swap location suffix is a valid,
known location according to this FlowFileRepository. Note that while
+ * the {@link #swapFlowFilesIn(String, List, FlowFileQueue)} and {@link
#swapFlowFilesOut(List, FlowFileQueue, String)} methods expect
+ * a full "swap location" this method expects only the "suffix" of a swap
location. For example, if the location points to a file, this method
+ * would expect only the filename, not the full path.
+ * </p>
+ *
+ * <p>
+ * This method differs from the others because the other methods want to
store the swap location or recover from a given location. However,
+ * this method is used to verify that the location is known. If for any
reason, NiFi is stopped, its FlowFile Repository relocated to a new
+ * location (for example, a different disk partition), and restarted, the
swap location would not match if we used the full location. Therefore,
+ * by using only the "suffix" (i.e. the filename for a file-based
implementation), we can avoid worrying about relocation.
+ * </p>
+ *
+ * @param swapLocationSuffix the suffix of the location to check
+ * @return <code>true</code> if the swap location is known and valid,
<code>false</code> otherwise
+ */
+ boolean isValidSwapLocationSuffix(String swapLocationSuffix);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 6bb1ea8..4b9af46 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -239,6 +239,7 @@
<exclude>src/test/resources/bye.txt</exclude>
<exclude>src/test/resources/old-swap-file.swap</exclude>
<exclude>src/test/resources/xxe_template.xml</exclude>
+
<exclude>src/test/resources/swap/444-old-swap-file.swap</exclude>
</excludes>
</configuration>
</plugin>
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 5f8f925..b2717c2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller;
import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
@@ -27,6 +28,8 @@ import
org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.swap.SchemaSwapDeserializer;
import org.apache.nifi.controller.swap.SchemaSwapSerializer;
import org.apache.nifi.controller.swap.SimpleSwapDeserializer;
+import org.apache.nifi.controller.swap.StandardSwapContents;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.controller.swap.SwapDeserializer;
import org.apache.nifi.controller.swap.SwapSerializer;
import org.apache.nifi.events.EventReporter;
@@ -95,14 +98,17 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
}
public FileSystemSwapManager(final NiFiProperties nifiProperties) {
- final Path flowFileRepoPath =
nifiProperties.getFlowFileRepositoryPath();
+ this(nifiProperties.getFlowFileRepositoryPath());
+ }
+ public FileSystemSwapManager(final Path flowFileRepoPath) {
this.storageDirectory = flowFileRepoPath.resolve("swap").toFile();
if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
throw new RuntimeException("Cannot create Swap Storage directory "
+ storageDirectory.getAbsolutePath());
}
}
+
@Override
public synchronized void initialize(final SwapManagerInitializationContext
initializationContext) {
this.claimManager = initializationContext.getResourceClaimManager();
@@ -152,6 +158,16 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
@Override
public SwapContents swapIn(final String swapLocation, final FlowFileQueue
flowFileQueue) throws IOException {
final File swapFile = new File(swapLocation);
+
+ final boolean validLocation =
flowFileRepository.isValidSwapLocationSuffix(swapFile.getName());
+ if (!validLocation) {
+ warn("Cannot swap in FlowFiles from location " + swapLocation + "
because the FlowFile Repository does not know about this Swap Location. " +
+ "This file should be manually removed. This typically occurs
when a Swap File is written but the FlowFile Repository is not updated yet to
reflect this. " +
+ "This is generally not a cause for concern, but may be
indicative of a failure to update the FlowFile Repository.");
+ final SwapSummary swapSummary = new StandardSwapSummary(new
QueueSize(0, 0), 0L, Collections.emptyList());
+ return new StandardSwapContents(swapSummary,
Collections.emptyList());
+ }
+
final SwapContents swapContents = peek(swapLocation, flowFileQueue);
flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(),
swapContents.getFlowFiles(), flowFileQueue);
@@ -311,7 +327,7 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
}
}
- Collections.sort(swapLocations, new SwapFileComparator());
+ swapLocations.sort(new SwapFileComparator());
return swapLocations;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index 058c714..df19f44 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -180,19 +180,23 @@ public class SwappablePriorityQueue {
int flowFilesSwappedOut = 0;
final List<String> swapLocations = new ArrayList<>(numSwapFiles);
for (int i = 0; i < numSwapFiles; i++) {
+ long bytesSwappedThisIteration = 0L;
+
// Create a new swap file for the next SWAP_RECORD_POLL_SIZE
records
final List<FlowFileRecord> toSwap = new
ArrayList<>(SWAP_RECORD_POLL_SIZE);
for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
final FlowFileRecord flowFile = tempQueue.poll();
toSwap.add(flowFile);
- bytesSwappedOut += flowFile.getSize();
- flowFilesSwappedOut++;
+ bytesSwappedThisIteration += flowFile.getSize();
}
try {
Collections.reverse(toSwap); // currently ordered in reverse
priority order based on the ordering of the temp queue.
final String swapLocation = swapManager.swapOut(toSwap,
flowFileQueue, swapPartitionName);
swapLocations.add(swapLocation);
+
+ bytesSwappedOut += bytesSwappedThisIteration;
+ flowFilesSwappedOut += toSwap.size();
} catch (final IOException ioe) {
tempQueue.addAll(toSwap); // if we failed, we must add the
FlowFiles back to the queue.
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index cc3ac19..216449c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -161,7 +161,7 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
// so that we are able to aggregate many into a single Fork Event.
private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders =
new HashMap<>();
- private Checkpoint checkpoint = new Checkpoint();
+ private Checkpoint checkpoint = null;
private final ContentClaimWriteCache claimCache;
public StandardProcessSession(final RepositoryContext context, final
TaskTermination taskTermination) {
@@ -1489,7 +1489,18 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
private void registerDequeuedRecord(final FlowFileRecord flowFile, final
Connection connection) {
final StandardRepositoryRecord record = new
StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
- records.put(flowFile.getId(), record);
+
+ // Ensure that the checkpoint does not have a FlowFile with the same
ID already. This should not occur,
+ // but this is a safety check just to make sure, because if it were to
occur, and we did process the FlowFile,
+ // we would have a lot of problems, since the map is keyed off of the
FlowFile ID.
+ if (this.checkpoint != null) {
+ final StandardRepositoryRecord checkpointedRecord =
this.checkpoint.getRecord(flowFile);
+ handleConflictingId(flowFile, connection, checkpointedRecord);
+ }
+
+ final StandardRepositoryRecord existingRecord =
records.putIfAbsent(flowFile.getId(), record);
+ handleConflictingId(flowFile, connection, existingRecord); // Ensure
that we have no conflicts
+
flowFilesIn++;
contentSizeIn += flowFile.getSize();
@@ -1503,6 +1514,21 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
incrementConnectionOutputCounts(connection, flowFile);
}
+ private void handleConflictingId(final FlowFileRecord flowFile, final
Connection connection, final StandardRepositoryRecord conflict) {
+ if (conflict == null) {
+ // No conflict
+ return;
+ }
+
+ LOG.error("Attempted to pull {} from {} but the Session already has a
FlowFile with the same ID ({}): {}, which was pulled from {}. This means that
the system has two FlowFiles with the" +
+ " same ID, which should not happen.", flowFile, connection,
flowFile.getId(), conflict.getCurrent(), conflict.getOriginalQueue());
+ connection.getFlowFileQueue().put(flowFile);
+
+ rollback(true, false);
+ throw new FlowFileAccessException("Attempted to pull a FlowFile with
ID " + flowFile.getId() + " from Connection "
+ + connection + " but a FlowFile with that ID already exists in the
session");
+ }
+
@Override
public void adjustCounter(final String name, final long delta, final
boolean immediate) {
verifyTaskActive();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index dee5346..da714a6 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -16,16 +16,16 @@
*/
package org.apache.nifi.controller.repository;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* <p>
* An in-memory implementation of the {@link FlowFileRepository}. Upon
restart, all FlowFiles will be discarded, including those that have been
swapped out by a {@link FlowFileSwapManager}.
@@ -137,4 +137,8 @@ public class VolatileFlowFileRepository implements
FlowFileRepository {
public void swapFlowFilesOut(List<FlowFileRecord> swappedOut,
FlowFileQueue queue, String swapLocation) throws IOException {
}
+ @Override
+ public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
+ return false;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index b5a61c6..d8e45f2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -16,6 +16,19 @@
*/
package org.apache.nifi.controller.repository;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.MinimalLockingWriteAheadLog;
+import org.wali.SyncListener;
+import org.wali.WriteAheadRepository;
+
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@@ -42,19 +55,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SyncListener;
-import org.wali.WriteAheadRepository;
-
/**
* <p>
* Implements FlowFile Repository using WALI as the backing store.
@@ -101,6 +101,8 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
private final int numPartitions;
private final ScheduledExecutorService checkpointExecutor;
+ private final Set<String> swapLocationSuffixes = new HashSet<>(); //
guraded by synchronizing on object itself
+
// effectively final
private WriteAheadRepository<RepositoryRecord> wal;
private RepositoryRecordSerdeFactory serdeFactory;
@@ -134,7 +136,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
*/
public WriteAheadFlowFileRepository() {
alwaysSync = false;
- checkpointDelayMillis = 0l;
+ checkpointDelayMillis = 0L;
numPartitions = 0;
checkpointExecutor = null;
walImplementation = null;
@@ -278,6 +280,13 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
return !resourceClaim.isInUse();
}
+ @Override
+ public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
+ synchronized (swapLocationSuffixes) {
+ return swapLocationSuffixes.contains(swapLocationSuffix);
+ }
+ }
+
private void updateRepository(final Collection<RepositoryRecord> records,
final boolean sync) throws IOException {
for (final RepositoryRecord record : records) {
if (record.getType() != RepositoryRecordType.DELETE &&
record.getType() != RepositoryRecordType.CONTENTMISSING
@@ -308,6 +317,10 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
// This does not, however, cause problems, as ContentRepository should
handle this
// This does indicate that some refactoring should probably be
performed, though, as this is not a very clean interface.
final Set<ResourceClaim> claimsToAdd = new HashSet<>();
+
+ final Set<String> swapLocationsAdded = new HashSet<>();
+ final Set<String> swapLocationsRemoved = new HashSet<>();
+
for (final RepositoryRecord record : records) {
if (record.getType() == RepositoryRecordType.DELETE) {
// For any DELETE record that we have, if claim is
destructible, mark it so
@@ -324,6 +337,14 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
if (record.getOriginalClaim() != null &&
record.getCurrentClaim() != record.getOriginalClaim() &&
isDestructable(record.getOriginalClaim())) {
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
}
+ } else if (record.getType() == RepositoryRecordType.SWAP_OUT) {
+ final String swapLocation = record.getSwapLocation();
+ swapLocationsAdded.add(swapLocation);
+ swapLocationsRemoved.remove(swapLocation);
+ } else if (record.getType() == RepositoryRecordType.SWAP_IN) {
+ final String swapLocation = record.getSwapLocation();
+ swapLocationsRemoved.add(swapLocation);
+ swapLocationsAdded.remove(swapLocation);
}
final List<ContentClaim> transientClaims =
record.getTransientClaims();
@@ -336,6 +357,14 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
}
+ // If we have swapped files in or out, we need to ensure that we
update our swapLocationSuffixes.
+ if (!swapLocationsAdded.isEmpty() || !swapLocationsRemoved.isEmpty()) {
+ synchronized (swapLocationSuffixes) {
+ swapLocationsRemoved.forEach(loc ->
swapLocationSuffixes.remove(getLocationSuffix(loc)));
+ swapLocationsAdded.forEach(loc ->
swapLocationSuffixes.add(getLocationSuffix(loc)));
+ }
+ }
+
if (!claimsToAdd.isEmpty()) {
// Get / Register a Set<ContentClaim> for the given Partiton Index
final Integer partitionKey = Integer.valueOf(partitionIndex);
@@ -352,6 +381,20 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
}
+ protected static String getLocationSuffix(final String swapLocation) {
+ if (swapLocation == null) {
+ return null;
+ }
+
+ final String withoutTrailing = (swapLocation.endsWith("/") &&
swapLocation.length() > 1) ? swapLocation.substring(0, swapLocation.length() -
1) : swapLocation;
+ final int lastIndex = withoutTrailing.lastIndexOf("/");
+ if (lastIndex < 0 || lastIndex >= withoutTrailing.length() - 1) {
+ return withoutTrailing;
+ }
+
+ return withoutTrailing.substring(lastIndex + 1);
+ }
+
@Override
public void onSync(final int partitionIndex) {
final BlockingQueue<ResourceClaim> claimQueue =
claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex));
@@ -407,6 +450,10 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
// update WALI to indicate that the records were swapped out.
wal.update(repoRecords, true);
+ synchronized (this.swapLocationSuffixes) {
+ this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
+ }
+
logger.info("Successfully swapped out {} FlowFiles from {} to Swap
File {}", new Object[]{swappedOut.size(), queue, swapLocation});
}
@@ -423,6 +470,11 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
updateRepository(repoRecords, true);
+
+ synchronized (this.swapLocationSuffixes) {
+ this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
+ }
+
logger.info("Repository updated to reflect that {} FlowFiles were
swapped in to {}", new Object[]{swapRecords.size(), queue});
}
@@ -544,6 +596,12 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
// Repo was written using that impl, that we properly recover from the
implementation.
Collection<RepositoryRecord> recordList = wal.recoverRecords();
+ final Set<String> recoveredSwapLocations =
wal.getRecoveredSwapLocations();
+ synchronized (this.swapLocationSuffixes) {
+ recoveredSwapLocations.forEach(loc ->
this.swapLocationSuffixes.add(getLocationSuffix(loc)));
+ logger.debug("Recovered {} Swap Files: {}",
swapLocationSuffixes.size(), swapLocationSuffixes);
+ }
+
// If we didn't recover any records from our write-ahead log, attempt
to recover records from the other implementation
// of the write-ahead log. We do this in case the user changed the
"nifi.flowfile.repository.wal.impl" property.
// In such a case, we still want to recover the records from the
previous FlowFile Repository and write them into the new one.
@@ -591,7 +649,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
// Set the AtomicLong to 1 more than the max ID so that calls to
#getNextFlowFileSequence() will
// return the appropriate number.
flowFileSequenceGenerator.set(maxId + 1);
- logger.info("Successfully restored {} FlowFiles", recordList.size() -
numFlowFilesMissingQueue);
+ logger.info("Successfully restored {} FlowFiles and {} Swap Files",
recordList.size() - numFlowFilesMissingQueue, recoveredSwapLocations.size());
if (numFlowFilesMissingQueue > 0) {
logger.warn("On recovery, found {} FlowFiles whose queue no longer
exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
index 33b71f0..a1206c7 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
@@ -17,16 +17,6 @@
package org.apache.nifi.controller;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
@@ -39,6 +29,16 @@ import
org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
public class MockSwapManager implements FlowFileSwapManager {
public final Map<String, List<FlowFileRecord>> swappedOut = new
HashMap<>();
public int swapOutCalledCount = 0;
@@ -49,10 +49,22 @@ public class MockSwapManager implements FlowFileSwapManager
{
public int failSwapInAfterN = -1;
public Throwable failSwapInFailure = null;
+ private int failSwapOutAfterN = -1;
+ private IOException failSwapOutFailure = null;
+
public void setSwapInFailure(final Throwable t) {
this.failSwapInFailure = t;
}
+ public void setSwapOutFailureOnNthIteration(final int n) {
+ setSwapOutFailureOnNthIteration(n, null);
+ }
+
+ public void setSwapOutFailureOnNthIteration(final int n, final IOException
failureException) {
+ this.failSwapOutAfterN = n;
+ this.failSwapOutFailure = failureException;
+ }
+
@Override
public void initialize(final SwapManagerInitializationContext
initializationContext) {
@@ -65,6 +77,12 @@ public class MockSwapManager implements FlowFileSwapManager {
@Override
public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue
flowFileQueue, final String partitionName) throws IOException {
swapOutCalledCount++;
+
+ if (failSwapOutAfterN > -1 && swapOutCalledCount >= failSwapOutAfterN)
{
+ final IOException ioe = failSwapOutFailure == null ? new
IOException("Intentional Unit Test IOException on swap out call number " +
swapOutCalledCount) : failSwapOutFailure;
+ throw ioe;
+ }
+
final String location = UUID.randomUUID().toString() + "." +
partitionName;
swappedOut.put(location, new ArrayList<>(flowFiles));
return location;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 46bea31..dd71f0e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -16,28 +16,37 @@
*/
package org.apache.nifi.controller;
-import static org.junit.Assert.assertEquals;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.SwapContents;
-import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
-import org.junit.Test;
-import org.mockito.Mockito;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
public class TestFileSystemSwapManager {
@@ -48,7 +57,7 @@ public class TestFileSystemSwapManager {
final DataInputStream in = new DataInputStream(new
BufferedInputStream(fis))) {
final FlowFileQueue flowFileQueue =
Mockito.mock(FlowFileQueue.class);
-
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
final FileSystemSwapManager swapManager = createSwapManager();
final SwapContents swapContents =
swapManager.peek("src/test/resources/old-swap-file.swap", flowFileQueue);
@@ -63,11 +72,88 @@ public class TestFileSystemSwapManager {
}
}
+ @Test
+ public void testFailureOnRepoSwapOut() throws IOException {
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+ final FlowFileRepository flowFileRepo =
Mockito.mock(FlowFileRepository.class);
+ Mockito.doThrow(new IOException("Intentional IOException for unit
test"))
+ .when(flowFileRepo).updateRepository(anyCollection());
+
+ final FileSystemSwapManager swapManager = createSwapManager();
+
+ final List<FlowFileRecord> flowFileRecords = new ArrayList<>();
+ for (int i=0; i < 10000; i++) {
+ flowFileRecords.add(new MockFlowFileRecord(i));
+ }
+
+ try {
+ swapManager.swapOut(flowFileRecords, flowFileQueue, "partition-1");
+ Assert.fail("Expected IOException");
+ } catch (final IOException ioe) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testSwapFileUnknownToRepoNotSwappedIn() throws IOException {
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ when(flowFileQueue.getIdentifier()).thenReturn("");
+
+ final File targetDir = new File("target/swap");
+ targetDir.mkdirs();
+
+ final File targetFile = new File(targetDir, "444-old-swap-file.swap");
+ final File originalSwapFile = new
File("src/test/resources/swap/444-old-swap-file.swap");
+ try (final OutputStream fos = new FileOutputStream(targetFile);
+ final InputStream fis = new FileInputStream(originalSwapFile)) {
+ StreamUtils.copy(fis, fos);
+ }
+
+ final FileSystemSwapManager swapManager = new
FileSystemSwapManager(Paths.get("target"));
+ final ResourceClaimManager resourceClaimManager = new
NopResourceClaimManager();
+ final FlowFileRepository flowFileRepo =
Mockito.mock(FlowFileRepository.class);
+
+ swapManager.initialize(new SwapManagerInitializationContext() {
+ @Override
+ public ResourceClaimManager getResourceClaimManager() {
+ return resourceClaimManager;
+ }
+
+ @Override
+ public FlowFileRepository getFlowFileRepository() {
+ return flowFileRepo;
+ }
+
+ @Override
+ public EventReporter getEventReporter() {
+ return EventReporter.NO_OP;
+ }
+ });
+
+
when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(false);
+ final List<String> recoveredLocations =
swapManager.recoverSwapLocations(flowFileQueue, null);
+ assertEquals(1, recoveredLocations.size());
+
+ final String firstLocation = recoveredLocations.get(0);
+ final SwapContents emptyContents = swapManager.swapIn(firstLocation,
flowFileQueue);
+ assertEquals(0, emptyContents.getFlowFiles().size());
+
+
when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(true);
+
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+ final SwapContents contents = swapManager.swapIn(firstLocation,
flowFileQueue);
+ assertEquals(10000, contents.getFlowFiles().size());
+ }
private FileSystemSwapManager createSwapManager() {
+ final FlowFileRepository flowFileRepo =
Mockito.mock(FlowFileRepository.class);
+ return createSwapManager(flowFileRepo);
+ }
+
+ private FileSystemSwapManager createSwapManager(final FlowFileRepository
flowFileRepo) {
final FileSystemSwapManager swapManager = new FileSystemSwapManager();
final ResourceClaimManager resourceClaimManager = new
NopResourceClaimManager();
- final FlowFileRepository flowfileRepo =
Mockito.mock(FlowFileRepository.class);
swapManager.initialize(new SwapManagerInitializationContext() {
@Override
public ResourceClaimManager getResourceClaimManager() {
@@ -76,7 +162,7 @@ public class TestFileSystemSwapManager {
@Override
public FlowFileRepository getFlowFileRepository() {
- return flowfileRepo;
+ return flowFileRepo;
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
index 71ad257..ef1a063 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
@@ -17,21 +17,6 @@
package org.apache.nifi.controller.queue.clustered;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
import org.apache.nifi.controller.MockFlowFileRecord;
import org.apache.nifi.controller.MockSwapManager;
import org.apache.nifi.controller.queue.DropFlowFileAction;
@@ -42,16 +27,34 @@ import
org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TestSwappablePriorityQueue {
private MockSwapManager swapManager;
- private final EventReporter eventReporter = EventReporter.NO_OP;
+ private final List<String> events = new ArrayList<>();
+ private EventReporter eventReporter;
+
private final FlowFileQueue flowFileQueue =
Mockito.mock(FlowFileQueue.class);
private final DropFlowFileAction dropAction = (flowFiles, requestor) -> {
return new QueueSize(flowFiles.size(),
flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
@@ -63,12 +66,36 @@ public class TestSwappablePriorityQueue {
public void setup() {
swapManager = new MockSwapManager();
+ events.clear();
+ eventReporter = new EventReporter() {
+ @Override
+ public void reportEvent(final Severity severity, final String
category, final String message) {
+ events.add(message);
+ }
+ };
+
when(flowFileQueue.getIdentifier()).thenReturn("unit-test");
queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter,
flowFileQueue, dropAction, "local");
}
@Test
+ public void testSwapOutFailureLeavesCorrectQueueSize() {
+ swapManager.setSwapOutFailureOnNthIteration(1, null);
+
+ for (int i = 0; i < 19999; i++) {
+ queue.put(new MockFlowFile(i));
+ }
+
+ assertEquals(19999, queue.size().getObjectCount());
+ assertEquals(0, events.size());
+
+ queue.put(new MockFlowFile(20000));
+ assertEquals(20000, queue.size().getObjectCount());
+ assertEquals(1, events.size()); // Expect a single failure event to be
emitted
+ }
+
+ @Test
public void testPrioritizer() {
final FlowFilePrioritizer prioritizer = (o1, o2) ->
Long.compare(o1.getId(), o2.getId());
queue.setPriorities(Collections.singletonList(prioritizer));
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index efe2bd4..7cd2fd6 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -277,14 +277,13 @@ public class TestStandardProcessSession {
connList.add(conn1);
connList.add(conn2);
- final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ final StandardFlowFileRecord.Builder flowFileRecordBuilder = new
StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .entryDate(System.currentTimeMillis());
- flowFileQueue.put(flowFileRecord);
- flowFileQueue.put(flowFileRecord);
+ flowFileQueue.put(flowFileRecordBuilder.build());
+ flowFileQueue.put(flowFileRecordBuilder.id(1001).build());
when(connectable.getIncomingConnections()).thenReturn(connList);
@@ -296,6 +295,36 @@ public class TestStandardProcessSession {
}
@Test
+ public void testHandlingOfMultipleFlowFilesWithSameId() {
+ // Add two FlowFiles with the same ID
+ for (int i=0; i < 2; i++) {
+ final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ .id(1000L)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(0L)
+ .build();
+
+ flowFileQueue.put(flowFileRecord);
+ }
+
+ final Relationship relationship = new
Relationship.Builder().name("A").build();
+
+ FlowFile ff1 = session.get();
+ assertNotNull(ff1);
+
+ session.transfer(ff1, relationship);
+
+ try {
+ session.get();
+ Assert.fail("Should not have been able to poll second FlowFile
with same ID");
+ } catch (final FlowFileAccessException e) {
+ // Expected
+ }
+ }
+
+
+ @Test
public void testCloneOriginalDataSmaller() throws IOException {
final byte[] originalContent = "hello".getBytes();
final byte[] replacementContent = "NEW DATA".getBytes();
@@ -416,14 +445,14 @@ public class TestStandardProcessSession {
connList.add(conn1);
connList.add(conn2);
- final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+
+ final StandardFlowFileRecord.Builder flowFileRecord = new
StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .entryDate(System.currentTimeMillis());
- flowFileQueue.put(flowFileRecord);
- flowFileQueue.put(flowFileRecord);
+ flowFileQueue.put(flowFileRecord.build());
+ flowFileQueue.put(flowFileRecord.id(1001).build());
when(connectable.getIncomingConnections()).thenReturn(connList);
@@ -475,14 +504,13 @@ public class TestStandardProcessSession {
connList.add(conn1);
connList.add(conn2);
- final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ final StandardFlowFileRecord.Builder flowFileRecordBuilder = new
StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .entryDate(System.currentTimeMillis());
- flowFileQueue.put(flowFileRecord);
- flowFileQueue.put(flowFileRecord);
+ flowFileQueue.put(flowFileRecordBuilder.build());
+ flowFileQueue.put(flowFileRecordBuilder.id(10001L).build());
when(connectable.getIncomingConnections()).thenReturn(connList);
@@ -1383,10 +1411,11 @@ public class TestStandardProcessSession {
@Test
public void
testContentNotFoundExceptionThrownWhenUnableToReadDataStreamCallbackOffsetTooLarge()
{
final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
+ .id(1)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
.contentClaim(new
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true,
false), 0L))
- .build();
+ .build();
flowFileQueue.put(flowFileRecord);
FlowFile ff1 = session.get();
@@ -1399,12 +1428,13 @@ public class TestStandardProcessSession {
session.commit();
final FlowFileRecord flowFileRecord2 = new
StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
+ .id(2)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
.contentClaim(new
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true,
false), 0L))
- .contentClaimOffset(1000L)
- .size(1000L)
- .build();
+ .contentClaimOffset(1000L)
+ .size(1000L)
+ .build();
flowFileQueue.put(flowFileRecord2);
// attempt to read the data.
@@ -1424,10 +1454,11 @@ public class TestStandardProcessSession {
@Test
public void
testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
+ .id(1)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
.contentClaim(new
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true,
false), 0L))
- .build();
+ .build();
flowFileQueue.put(flowFileRecord);
@@ -1441,10 +1472,11 @@ public class TestStandardProcessSession {
session.commit();
final FlowFileRecord flowFileRecord2 = new
StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
+ .id(2)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
.contentClaim(new
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true,
false), 0L))
- .contentClaimOffset(1000L).size(1L).build();
+ .contentClaimOffset(1000L).size(1L).build();
flowFileQueue.put(flowFileRecord2);
// attempt to read the data.
@@ -1544,11 +1576,15 @@ public class TestStandardProcessSession {
@Test
public void testRollbackAfterCheckpoint() {
- final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
+ final StandardFlowFileRecord.Builder recordBuilder = new
StandardFlowFileRecord.Builder()
+ .id(1)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
.contentClaim(new
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true,
false), 0L))
- .contentClaimOffset(0L).size(0L).build();
+ .contentClaimOffset(0L)
+ .size(0L);
+
+ final FlowFileRecord flowFileRecord = recordBuilder.build();
flowFileQueue.put(flowFileRecord);
final FlowFile originalFlowFile = session.get();
@@ -1574,7 +1610,7 @@ public class TestStandardProcessSession {
session.rollback();
- flowFileQueue.put(flowFileRecord);
+ flowFileQueue.put(recordBuilder.id(2).build());
assertFalse(flowFileQueue.isActiveQueueEmpty());
final FlowFile originalRound2 = session.get();
@@ -1596,8 +1632,8 @@ public class TestStandardProcessSession {
session.commit();
- // FlowFile transferred back to queue
- assertEquals(1, flowFileQueue.size().getObjectCount());
+ // FlowFiles transferred back to queue
+ assertEquals(2, flowFileQueue.size().getObjectCount());
assertFalse(flowFileQueue.isUnacknowledgedFlowFile());
assertFalse(flowFileQueue.isActiveQueueEmpty());
}
@@ -2116,6 +2152,11 @@ public class TestStandardProcessSession {
@Override
public void initialize(ResourceClaimManager claimManager) throws
IOException {
}
+
+ @Override
+ public boolean isValidSwapLocationSuffix(final String
swapLocationSuffix) {
+ return false;
+ }
}
private static class MockContentRepository implements ContentRepository {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index a3ee5c1..1761bd8 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -70,6 +70,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@@ -396,6 +397,101 @@ public class TestWriteAheadFlowFileRepository {
}
+ @Test
+ public void testGetLocationSuffix() {
+ assertEquals("/", WriteAheadFlowFileRepository.getLocationSuffix("/"));
+ assertEquals("", WriteAheadFlowFileRepository.getLocationSuffix(""));
+ assertEquals(null,
WriteAheadFlowFileRepository.getLocationSuffix(null));
+ assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("test.txt"));
+ assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("/test.txt"));
+ assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("/tmp/test.txt"));
+ assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("//test.txt"));
+ assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("/path/to/other/file/repository/test.txt"));
+ assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("test.txt/"));
+ assertEquals("test.txt",
WriteAheadFlowFileRepository.getLocationSuffix("/path/to/test.txt/"));
+ }
+
+ @Test
+ public void testSwapLocationsRestored() throws IOException {
+ final Path path = Paths.get("target/test-swap-repo");
+ if (Files.exists(path)) {
+ FileUtils.deleteFile(path.toFile(), true);
+ }
+
+ final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null,
null));
+ repo.initialize(new StandardResourceClaimManager());
+
+ final TestQueueProvider queueProvider = new TestQueueProvider();
+ repo.loadFlowFiles(queueProvider, 0L);
+
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+
+ final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+ when(queue.getIdentifier()).thenReturn("1234");
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+
+ queueProvider.addConnection(connection);
+
+ StandardFlowFileRecord.Builder ffBuilder = new
StandardFlowFileRecord.Builder();
+ ffBuilder.id(1L);
+ ffBuilder.size(0L);
+ final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+ final List<RepositoryRecord> records = new ArrayList<>();
+ final StandardRepositoryRecord record = new
StandardRepositoryRecord(queue, flowFileRecord, "swap123");
+ record.setDestination(queue);
+ records.add(record);
+
+ repo.updateRepository(records);
+ repo.close();
+
+ // restore
+ final WriteAheadFlowFileRepository repo2 = new
WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null,
null));
+ repo2.initialize(new StandardResourceClaimManager());
+ repo2.loadFlowFiles(queueProvider, 0L);
+ assertTrue(repo2.isValidSwapLocationSuffix("swap123"));
+ assertFalse(repo2.isValidSwapLocationSuffix("other"));
+ repo2.close();
+ }
+
+ @Test
+ public void testSwapLocationsUpdatedOnRepoUpdate() throws IOException {
+ final Path path = Paths.get("target/test-swap-repo");
+ if (Files.exists(path)) {
+ FileUtils.deleteFile(path.toFile(), true);
+ }
+
+ final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null,
null));
+ repo.initialize(new StandardResourceClaimManager());
+
+ final TestQueueProvider queueProvider = new TestQueueProvider();
+ repo.loadFlowFiles(queueProvider, 0L);
+
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+
+ final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+ when(queue.getIdentifier()).thenReturn("1234");
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+
+ queueProvider.addConnection(connection);
+
+ StandardFlowFileRecord.Builder ffBuilder = new
StandardFlowFileRecord.Builder();
+ ffBuilder.id(1L);
+ ffBuilder.size(0L);
+ final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+ final List<RepositoryRecord> records = new ArrayList<>();
+ final StandardRepositoryRecord record = new
StandardRepositoryRecord(queue, flowFileRecord, "/tmp/swap123");
+ record.setDestination(queue);
+ records.add(record);
+
+ assertFalse(repo.isValidSwapLocationSuffix("swap123"));
+ repo.updateRepository(records);
+ assertTrue(repo.isValidSwapLocationSuffix("swap123"));
+ repo.close();
+ }
@Test
public void testResourceClaimsIncremented() throws IOException {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap
new file mode 100755
index 0000000..0176ed9
Binary files /dev/null and
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap
differ