Repository: nifi
Updated Branches:
  refs/heads/master c2616e6fe -> d14229e44


NIFI-4774: Allow user to choose which write-ahead log implementation should be 
used by the WriteAheadFlowFileRepository

Removed TODO comment

Signed-off-by: Matthew Burgess <mattyb...@apache.org>

This closes #2487


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d14229e4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d14229e4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d14229e4

Branch: refs/heads/master
Commit: d14229e4407ce4587ba422fbd85e15a9d4f66f85
Parents: c2616e6
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Feb 22 12:51:38 2018 -0500
Committer: Matthew Burgess <mattyb...@apache.org>
Committed: Tue Mar 13 13:03:39 2018 -0400

----------------------------------------------------------------------
 .../src/main/asciidoc/administration-guide.adoc |  10 +-
 .../WriteAheadFlowFileRepository.java           | 156 +++++++++++++++----
 .../nifi-framework/nifi-resources/pom.xml       |   1 +
 .../src/main/resources/conf/nifi.properties     |   1 +
 4 files changed, 138 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d14229e4/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index ea60e0a..b1759e4 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -2795,7 +2795,15 @@ to configure it on a separate drive if available.
 
 |====
 |*Property*|*Description*
-|nifi.flowfile.repository.implementation|The FlowFile Repository 
implementation. The default value is 
`org.apache.nifi.controller.repository.WriteAheadFlowFileRepository` and should 
only be changed with caution. To store flowfiles in memory instead of on disk 
(at the risk of data loss in the event of power/machine failure), set this 
property to `org.apache.nifi.controller.repository.VolatileFlowFileRepository`.
+|nifi.flowfile.repository.implementation|The FlowFile Repository 
implementation. The default value is 
`org.apache.nifi.controller.repository.WriteAheadFlowFileRepository` and should 
only be changed with caution. To store flowfiles in memory instead of on disk 
(accepting data loss in the event of power/machine failure or a restart of 
NiFi), set this property to 
`org.apache.nifi.controller.repository.VolatileFlowFileRepository`.
+|nifi.flowfile.repository.wal.implementation|If the repository implementation 
is configured to use the `WriteAheadFlowFileRepository`, this property can be 
used to specify which implementation of the 
+Write-Ahead Log should be used. The default value is 
`org.apache.nifi.wali.SequentialAccessWriteAheadLog`. This version of the 
write-ahead log was added in version 1.6.0 of Apache NiFi and was developed
+in order to address an issue that exists in the older implementation. In the 
event of power loss or an operating system crash, the old implementation was 
susceptible to recovering FlowFiles
+incorrectly. This could potentially lead to the wrong attributes or content 
being assigned to a FlowFile upon restart, following the power loss or OS 
crash. However, one can still choose to opt into
+using the previous implementation and accept that risk, if desired (for 
example, if the new implementation were to exhibit some unexpected error).
+To do so, set the value of this property to 
`org.wali.MinimalLockingWriteAheadLog`.
+If the value of this property is changed, upon restart, NiFi will still 
recover the records written using the previously configured repository and 
delete the files written by the previously configured
+implementation.   
 |nifi.flowfile.repository.directory*|The location of the FlowFile Repository. 
The default value is `./flowfile_repository`.
 |nifi.flowfile.repository.partitions|The number of partitions. The default 
value is `256`.
 |nifi.flowfile.repository.checkpoint.interval| The FlowFile Repository 
checkpoint interval. The default value is `2 mins`.

http://git-wip-us.apache.org/repos/asf/nifi/blob/d14229e4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 3901029..302288a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -80,6 +80,14 @@ import org.wali.WriteAheadRepository;
  */
 public class WriteAheadFlowFileRepository implements FlowFileRepository, 
SyncListener {
     private static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = 
"nifi.flowfile.repository.directory";
+    private static final String WRITE_AHEAD_LOG_IMPL = 
"nifi.flowfile.repository.wal.implementation";
+
+    private static final String SEQUENTIAL_ACCESS_WAL = 
"org.apache.nifi.wali.SequentialAccessWriteAheadLog";
+    private static final String MINIMAL_LOCKING_WALI = 
"org.wali.MinimalLockingWriteAheadLog";
+    private static final String DEFAULT_WAL_IMPLEMENTATION = 
SEQUENTIAL_ACCESS_WAL;
+
+    private final String walImplementation;
+    private final NiFiProperties nifiProperties;
 
     private final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
     private final boolean alwaysSync;
@@ -88,7 +96,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     private volatile ScheduledFuture<?> checkpointFuture;
 
     private final long checkpointDelayMillis;
-    private final File flowFileRepositoryPath;
+    private final List<File> flowFileRepositoryPaths = new ArrayList<>();
     private final List<File> recoveryFiles = new ArrayList<>();
     private final int numPartitions;
     private final ScheduledExecutorService checkpointExecutor;
@@ -129,17 +137,22 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         checkpointDelayMillis = 0l;
         numPartitions = 0;
         checkpointExecutor = null;
-        flowFileRepositoryPath = null;
+        walImplementation = null;
+        nifiProperties = null;
     }
 
     public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
         alwaysSync = 
Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC,
 "false"));
+        this.nifiProperties = nifiProperties;
 
         // determine the database file path and ensure it exists
-        final String directoryName = 
nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
-        flowFileRepositoryPath = new File(directoryName);
+        String writeAheadLogImpl = 
nifiProperties.getProperty(WRITE_AHEAD_LOG_IMPL);
+        if (writeAheadLogImpl == null) {
+            writeAheadLogImpl = DEFAULT_WAL_IMPLEMENTATION;
+        }
+        this.walImplementation = writeAheadLogImpl;
 
-        // We used to use the MinimalLockingWriteAheadLog, but we now use the 
SequentialAccessWriteAheadLog. Since the
+        // We used to use one implementation of the write-ahead log, but we 
now want to use the other, we must address this. Since the
         // MinimalLockingWriteAheadLog supports multiple partitions, we need 
to ensure that we recover records from all
         // partitions, so we build up a List of Files for the recovery files.
         for (final String propertyName : nifiProperties.getPropertyKeys()) {
@@ -149,6 +162,14 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             }
         }
 
+        if (walImplementation.equals(SEQUENTIAL_ACCESS_WAL)) {
+            final String directoryName = 
nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
+            flowFileRepositoryPaths.add(new File(directoryName));
+        } else {
+            flowFileRepositoryPaths.addAll(recoveryFiles);
+        }
+
+
         numPartitions = nifiProperties.getFlowFileRepositoryPartitions();
         checkpointDelayMillis = 
FormatUtils.getTimeDuration(nifiProperties.getFlowFileRepositoryCheckpointInterval(),
 TimeUnit.MILLISECONDS);
 
@@ -159,14 +180,29 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     public void initialize(final ResourceClaimManager claimManager) throws 
IOException {
         this.claimManager = claimManager;
 
-        Files.createDirectories(flowFileRepositoryPath.toPath());
+        for (final File file : flowFileRepositoryPaths) {
+            Files.createDirectories(file.toPath());
+        }
 
         // TODO: Should ensure that only 1 instance running and pointing at a 
particular path
         // TODO: Allow for backup path that can be used if disk out of space?? 
Would allow a snapshot to be stored on
         // backup and then the data deleted from the normal location; then can 
move backup to normal location and
         // delete backup. On restore, if no files exist in partition's 
directory, would have to check backup directory
         serdeFactory = new RepositoryRecordSerdeFactory(claimManager);
-        wal = new SequentialAccessWriteAheadLog<>(flowFileRepositoryPath, 
serdeFactory, this);
+
+        if (walImplementation.equals(SEQUENTIAL_ACCESS_WAL)) {
+            wal = new 
SequentialAccessWriteAheadLog<>(flowFileRepositoryPaths.get(0), serdeFactory, 
this);
+        } else if (walImplementation.equals(MINIMAL_LOCKING_WALI)) {
+            final SortedSet<Path> paths = flowFileRepositoryPaths.stream()
+                .map(File::toPath)
+                .collect(Collectors.toCollection(TreeSet::new));
+
+            wal = new MinimalLockingWriteAheadLog<>(paths, numPartitions, 
serdeFactory, this);
+        } else {
+            throw new IllegalStateException("Cannot create Write-Ahead Log 
because the configured property '" + WRITE_AHEAD_LOG_IMPL + "' has an invalid 
value of '" + walImplementation
+                + "'. Please update nifi.properties to indicate a valid value 
for this property.");
+        }
+
         logger.info("Initialized FlowFile Repository using {} partitions", 
numPartitions);
     }
 
@@ -187,12 +223,22 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
     @Override
     public long getStorageCapacity() throws IOException {
-        return 
Files.getFileStore(flowFileRepositoryPath.toPath()).getTotalSpace();
+        long capacity = 0L;
+        for (final File file : flowFileRepositoryPaths) {
+            capacity += Files.getFileStore(file.toPath()).getTotalSpace();
+        }
+
+        return capacity;
     }
 
     @Override
     public long getUsableStorageSpace() throws IOException {
-        return 
Files.getFileStore(flowFileRepositoryPath.toPath()).getUsableSpace();
+        long usableSpace = 0L;
+        for (final File file : flowFileRepositoryPaths) {
+            usableSpace += Files.getFileStore(file.toPath()).getUsableSpace();
+        }
+
+        return usableSpace;
     }
 
     @Override
@@ -369,9 +415,62 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         logger.info("Repository updated to reflect that {} FlowFiles were 
swapped in to {}", new Object[]{swapRecords.size(), queue});
     }
 
+    private void deleteRecursively(final File dir) {
+        final File[] children = dir.listFiles();
+
+        if (children != null) {
+            for (final File child : children) {
+                final boolean deleted = child.delete();
+                if (!deleted) {
+                    logger.warn("Failed to delete old file {}; this file 
should be cleaned up manually", child);
+                }
+            }
+        }
+
+        if (!dir.delete()) {
+            logger.warn("Failed to delete old directory {}; this directory 
should be cleaned up manually", dir);
+        }
+    }
+
+    private Optional<Collection<RepositoryRecord>> 
migrateFromSequentialAccessLog(final WriteAheadRepository<RepositoryRecord> 
toUpdate) throws IOException {
+        final String recoveryDirName = 
nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
+        final File recoveryDir = new File(recoveryDirName);
+        if (!recoveryDir.exists()) {
+            return Optional.empty();
+        }
+
+        final WriteAheadRepository<RepositoryRecord> recoveryWal = new 
SequentialAccessWriteAheadLog<>(recoveryDir, serdeFactory, this);
+        logger.info("Encountered FlowFile Repository that was written using 
the Sequential Access Write Ahead Log. Will recover from this version.");
+
+        final Collection<RepositoryRecord> recordList;
+        try {
+            recordList = recoveryWal.recoverRecords();
+        } finally {
+            recoveryWal.shutdown();
+        }
+
+        toUpdate.update(recordList, true);
+
+        logger.info("Successfully recovered files from existing Write-Ahead 
Log and transitioned to new Write-Ahead Log. Will not delete old files.");
+
+        final File journalsDir = new File(recoveryDir, "journals");
+        deleteRecursively(journalsDir);
+
+        final File checkpointFile = new File(recoveryDir, "checkpoint");
+        if (!checkpointFile.delete() && checkpointFile.exists()) {
+            logger.warn("Failed to delete old file {}; this file should be 
cleaned up manually", checkpointFile);
+        }
+
+        final File partialFile = new File(recoveryDir, "checkpoint.partial");
+        if (!partialFile.delete() && partialFile.exists()) {
+            logger.warn("Failed to delete old file {}; this file should be 
cleaned up manually", partialFile);
+        }
+
+        return Optional.of(recordList);
+    }
 
     @SuppressWarnings("deprecation")
-    private Optional<Collection<RepositoryRecord>> 
recoverFromOldWriteAheadLog() throws IOException {
+    private Optional<Collection<RepositoryRecord>> 
migrateFromMinimalLockingLog(final WriteAheadRepository<RepositoryRecord> 
toUpdate) throws IOException {
         final List<File> partitionDirs = new ArrayList<>();
         for (final File recoveryFile : recoveryFiles) {
             final File[] partitions = recoveryFile.listFiles(file -> 
file.getName().startsWith("partition-"));
@@ -384,7 +483,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             return Optional.empty();
         }
 
-        logger.info("Encountered FlowFile Repository that was written using an 
old version of the Write-Ahead Log. "
+        logger.info("Encountered FlowFile Repository that was written using 
the 'Minimal Locking Write-Ahead Log'. "
             + "Will recover from this version and re-write the repository 
using the new version of the Write-Ahead Log.");
 
         final SortedSet<Path> paths = recoveryFiles.stream()
@@ -399,25 +498,12 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             minimalLockingWal.shutdown();
         }
 
-        wal.update(recordList, true);
+        toUpdate.update(recordList, true);
 
         // Delete the old repository
         logger.info("Successfully recovered files from existing Write-Ahead 
Log and transitioned to new implementation. Will now delete old files.");
         for (final File partitionDir : partitionDirs) {
-            final File[] children = partitionDir.listFiles();
-
-            if (children != null) {
-                for (final File child : children) {
-                    final boolean deleted = child.delete();
-                    if (!deleted) {
-                        logger.warn("Failed to delete old file {}; this file 
should be cleaned up manually", child);
-                    }
-                }
-            }
-
-            if (!partitionDir.delete()) {
-                logger.warn("Failed to delete old directory {}; this directory 
should be cleaned up manually", partitionDir);
-            }
+            deleteRecursively(partitionDir);
         }
 
         for (final File recoveryFile : recoveryFiles) {
@@ -447,8 +533,21 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // Repo was written using that impl, that we properly recover from the 
implementation.
         Collection<RepositoryRecord> recordList = wal.recoverRecords();
 
+        // If we didn't recover any records from our write-ahead log, attempt 
to recover records from the other implementation
+        // of the write-ahead log. We do this in case the user changed the 
"nifi.flowfile.repository.wal.impl" property.
+        // In such a case, we still want to recover the records from the 
previous FlowFile Repository and write them into the new one.
+        // Since these implementations do not write to the same files, they 
will not interfere with one another. If we do recover records,
+        // then we will update the new WAL (with fsync()) and delete the old 
repository so that we won't recover it again.
         if (recordList == null || recordList.isEmpty()) {
-            recordList = recoverFromOldWriteAheadLog().orElse(new 
ArrayList<>());
+            if (walImplementation.equals(SEQUENTIAL_ACCESS_WAL)) {
+                // Configured to use Sequential Access WAL but it has no 
records. Check if there are records in
+                // a MinimalLockingWriteAheadLog that we can recover.
+                recordList = migrateFromMinimalLockingLog(wal).orElse(new 
ArrayList<>());
+            } else {
+                // Configured to use Minimal Locking WAL but it has no 
records. Check if there are records in
+                // a SequentialAccess Log that we can recover.
+                recordList = migrateFromSequentialAccessLog(wal).orElse(new 
ArrayList<>());
+            }
         }
 
         serdeFactory.setQueueMap(null);
@@ -495,8 +594,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
                     final int numRecordsCheckpointed = checkpoint();
                     final long end = System.nanoTime();
                     final long millis = TimeUnit.MILLISECONDS.convert(end - 
start, TimeUnit.NANOSECONDS);
-                    logger.info("Successfully checkpointed FlowFile Repository 
with {} records in {} milliseconds",
-                            new Object[]{numRecordsCheckpointed, millis});
+                    logger.info("Successfully checkpointed FlowFile Repository 
with {} records in {} milliseconds", numRecordsCheckpointed, millis);
                 } catch (final Throwable t) {
                     logger.error("Unable to checkpoint FlowFile Repository due 
to " + t.toString(), t);
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d14229e4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 59a69c5..9d81007 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -52,6 +52,7 @@
         
<nifi.state.management.provider.cluster>zk-provider</nifi.state.management.provider.cluster>
 
         
<nifi.flowfile.repository.implementation>org.apache.nifi.controller.repository.WriteAheadFlowFileRepository</nifi.flowfile.repository.implementation>
+        
<nifi.flowfile.repository.wal.implementation>org.apache.nifi.wali.SequentialAccessWriteAheadLog</nifi.flowfile.repository.wal.implementation>
         
<nifi.flowfile.repository.directory>./flowfile_repository</nifi.flowfile.repository.directory>
         
<nifi.flowfile.repository.partitions>256</nifi.flowfile.repository.partitions>
         <nifi.flowfile.repository.checkpoint.interval>2 
mins</nifi.flowfile.repository.checkpoint.interval>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d14229e4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 868ec0a..5a167f7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -56,6 +56,7 @@ nifi.h2.url.append=${nifi.h2.url.append}
 
 # FlowFile Repository
 
nifi.flowfile.repository.implementation=${nifi.flowfile.repository.implementation}
+nifi.flowfile.repository.wal.implementation=${nifi.flowfile.repository.wal.implementation}
 nifi.flowfile.repository.directory=${nifi.flowfile.repository.directory}
 nifi.flowfile.repository.partitions=${nifi.flowfile.repository.partitions}
 
nifi.flowfile.repository.checkpoint.interval=${nifi.flowfile.repository.checkpoint.interval}

Reply via email to