NIFI-4774: Implementation of SequentialAccessWriteAheadLog and updates to 
WriteAheadFlowFileRepository to make use of the new implementation instead of 
MinimalLockingWriteAheadLog.

Signed-off-by: Matthew Burgess <mattyb...@apache.org>

This closes #2416


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0bcb241d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0bcb241d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0bcb241d

Branch: refs/heads/master
Commit: 0bcb241db30f6f2560d305de8a3f50184b731e08
Parents: 14fef2d
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Jan 17 14:24:04 2018 -0500
Committer: Matthew Burgess <mattyb...@apache.org>
Committed: Mon Feb 19 09:26:01 2018 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/wali/BlockingQueuePool.java |  56 +++
 .../nifi/wali/ByteArrayDataOutputStream.java    |  45 ++
 .../org/apache/nifi/wali/HashMapSnapshot.java   | 366 ++++++++++++++
 .../org/apache/nifi/wali/JournalRecovery.java   |  27 ++
 .../org/apache/nifi/wali/JournalSummary.java    |  35 ++
 .../nifi/wali/LengthDelimitedJournal.java       | 478 +++++++++++++++++++
 .../java/org/apache/nifi/wali/ObjectPool.java   |  25 +
 .../java/org/apache/nifi/wali/RecordLookup.java |  29 ++
 .../wali/SequentialAccessWriteAheadLog.java     | 320 +++++++++++++
 .../org/apache/nifi/wali/SnapshotCapture.java   |  29 ++
 .../org/apache/nifi/wali/SnapshotRecovery.java  |  58 +++
 .../nifi/wali/StandardJournalRecovery.java      |  45 ++
 .../nifi/wali/StandardJournalSummary.java       |  46 ++
 .../nifi/wali/StandardSnapshotRecovery.java     |  56 +++
 .../org/apache/nifi/wali/WriteAheadJournal.java |  51 ++
 .../apache/nifi/wali/WriteAheadSnapshot.java    |  33 ++
 .../org/wali/MinimalLockingWriteAheadLog.java   |  13 +-
 .../src/main/java/org/wali/SyncListener.java    |  10 +
 .../apache/nifi/wali/TestBlockingQueuePool.java | 115 +++++
 .../apache/nifi/wali/TestHashMapSnapshot.java   | 216 +++++++++
 .../nifi/wali/TestLengthDelimitedJournal.java   | 353 ++++++++++++++
 .../wali/TestSequentialAccessWriteAheadLog.java | 345 +++++++++++++
 .../src/test/java/org/wali/DummyRecord.java     |  31 ++
 .../test/java/org/wali/DummyRecordSerde.java    |  57 ++-
 .../wali/TestMinimalLockingWriteAheadLog.java   |  55 ++-
 .../WriteAheadFlowFileRepository.java           | 114 ++++-
 26 files changed, 2960 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/BlockingQueuePool.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/BlockingQueuePool.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/BlockingQueuePool.java
new file mode 100644
index 0000000..1b3346b
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/BlockingQueuePool.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+public class BlockingQueuePool<T> implements ObjectPool<T> {
+    private final BlockingQueue<T> queue;
+    private final Supplier<T> creationFunction;
+    private final Predicate<T> reuseCheck;
+    private final Consumer<T> returnPreparation;
+
+    public BlockingQueuePool(final int maxSize, final Supplier<T> 
creationFunction, final Predicate<T> reuseCheck, final Consumer<T> 
returnPreparation) {
+        this.queue = new LinkedBlockingQueue<>(maxSize);
+        this.creationFunction = creationFunction;
+        this.reuseCheck = reuseCheck;
+        this.returnPreparation = returnPreparation;
+    }
+
+    @Override
+    public T borrowObject() {
+        final T existing = queue.poll();
+        if (existing != null) {
+            return existing;
+        }
+
+        return creationFunction.get();
+    }
+
+    @Override
+    public void returnObject(final T somethingBorrowed) {
+        if (reuseCheck.test(somethingBorrowed)) {
+            returnPreparation.accept(somethingBorrowed);
+            queue.offer(somethingBorrowed);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java
new file mode 100644
index 0000000..1468d49
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+
+/**
+ * A wrapper around a DataOutputStream, which wraps a ByteArrayOutputStream.
+ * This allows us to obtain the DataOutputStream itself so that we can perform
+ * writeXYZ methods and also allows us to obtain the underlying 
ByteArrayOutputStream
+ * for performing methods such as size(), reset(), writeTo()
+ */
+public class ByteArrayDataOutputStream {
+    private final ByteArrayOutputStream baos;
+    private final DataOutputStream dos;
+
+    public ByteArrayDataOutputStream(final int intiialBufferSize) {
+        this.baos = new ByteArrayOutputStream(intiialBufferSize);
+        this.dos = new DataOutputStream(baos);
+    }
+
+    public DataOutputStream getDataOutputStream() {
+        return dos;
+    }
+
+    public ByteArrayOutputStream getByteArrayOutputStream() {
+        return baos;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
new file mode 100644
index 0000000..0dad62c
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.SerDeFactory;
+import org.wali.UpdateType;
+
+public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, 
RecordLookup<T> {
+    private static final Logger logger = 
LoggerFactory.getLogger(HashMapSnapshot.class);
+    private static final int ENCODING_VERSION = 1;
+
+    private final ConcurrentMap<Object, T> recordMap = new 
ConcurrentHashMap<>();
+    private final SerDeFactory<T> serdeFactory;
+    private final Set<String> swapLocations = Collections.synchronizedSet(new 
HashSet<>());
+    private final File storageDirectory;
+
+    public HashMapSnapshot(final File storageDirectory, final SerDeFactory<T> 
serdeFactory) {
+        this.serdeFactory = serdeFactory;
+        this.storageDirectory = storageDirectory;
+    }
+
+    private SnapshotHeader validateHeader(final DataInputStream dataIn) throws 
IOException {
+        final String snapshotClass = dataIn.readUTF();
+        logger.debug("Snapshot Class Name for {} is {}", storageDirectory, 
snapshotClass);
+        if (!snapshotClass.equals(HashMapSnapshot.class.getName())) {
+            throw new IOException("Write-Ahead Log Snapshot located at " + 
storageDirectory + " was written using the "
+                + snapshotClass + " class; cannot restore using " + 
getClass().getName());
+        }
+
+        final int snapshotVersion = dataIn.readInt();
+        logger.debug("Snapshot version for {} is {}", storageDirectory, 
snapshotVersion);
+        if (snapshotVersion > getVersion()) {
+            throw new IOException("Write-Ahead Log Snapshot located at " + 
storageDirectory + " was written using version "
+                + snapshotVersion + " of the " + snapshotClass + " class; 
cannot restore using Version " + getVersion());
+        }
+
+        final String serdeEncoding = dataIn.readUTF(); // ignore serde class 
name for now
+        logger.debug("Serde encoding for Snapshot at {} is {}", 
storageDirectory, serdeEncoding);
+
+        final int serdeVersion = dataIn.readInt();
+        logger.debug("Serde version for Snapshot at {} is {}", 
storageDirectory, serdeVersion);
+
+        final long maxTransactionId = dataIn.readLong();
+        logger.debug("Max Transaction ID for Snapshot at {} is {}", 
storageDirectory, maxTransactionId);
+
+        final int numRecords = dataIn.readInt();
+        logger.debug("Number of Records for Snapshot at {} is {}", 
storageDirectory, numRecords);
+
+        final SerDe<T> serde = serdeFactory.createSerDe(serdeEncoding);
+        serde.readHeader(dataIn);
+
+        return new SnapshotHeader(serde, serdeVersion, maxTransactionId, 
numRecords);
+    }
+
+    @Override
+    public SnapshotRecovery<T> recover() throws IOException {
+        final File partialFile = getPartialFile();
+        final File snapshotFile = getSnapshotFile();
+        final boolean partialExists = partialFile.exists();
+        final boolean snapshotExists = snapshotFile.exists();
+
+        // If there is no snapshot (which is the case before the first 
snapshot is ever created), then just
+        // return an empty recovery.
+        if (!partialExists && !snapshotExists) {
+            return SnapshotRecovery.emptyRecovery();
+        }
+
+        if (partialExists && snapshotExists) {
+            // both files exist -- assume NiFi crashed/died while 
checkpointing. Delete the partial file.
+            Files.delete(partialFile.toPath());
+        } else if (partialExists) {
+            // partial exists but snapshot does not -- we must have completed
+            // creating the partial, deleted the snapshot
+            // but crashed before renaming the partial to the snapshot. Just
+            // rename partial to snapshot
+            Files.move(partialFile.toPath(), snapshotFile.toPath());
+        }
+
+        if (snapshotFile.length() == 0) {
+            logger.warn("{} Found 0-byte Snapshot file; skipping Snapshot file 
in recovery", this);
+            return SnapshotRecovery.emptyRecovery();
+        }
+
+        // At this point, we know the snapshotPath exists because if it 
didn't, then we either returned null
+        // or we renamed partialPath to snapshotPath. So just Recover from 
snapshotPath.
+        try (final DataInputStream dataIn = new DataInputStream(new 
BufferedInputStream(new FileInputStream(snapshotFile)))) {
+            // Ensure that the header contains the information that we expect 
and retrieve the relevant information from the header.
+            final SnapshotHeader header = validateHeader(dataIn);
+
+            final SerDe<T> serde = header.getSerDe();
+            final int serdeVersion = header.getSerDeVersion();
+            final int numRecords = header.getNumRecords();
+            final long maxTransactionId = header.getMaxTransactionId();
+
+            // Read all of the records that we expect to receive.
+            for (int i = 0; i < numRecords; i++) {
+                final T record = serde.deserializeRecord(dataIn, serdeVersion);
+                if (record == null) {
+                    throw new EOFException();
+                }
+
+                final UpdateType updateType = serde.getUpdateType(record);
+                if (updateType == UpdateType.DELETE) {
+                    logger.warn("While recovering from snapshot, found record 
with type 'DELETE'; this record will not be restored");
+                    continue;
+                }
+
+                logger.trace("Recovered from snapshot: {}", record);
+                recordMap.put(serde.getRecordIdentifier(record), record);
+            }
+
+            // Determine the location of any swap files.
+            final int numSwapRecords = dataIn.readInt();
+            final Set<String> swapLocations = new HashSet<>();
+            for (int i = 0; i < numSwapRecords; i++) {
+                swapLocations.add(dataIn.readUTF());
+            }
+            this.swapLocations.addAll(swapLocations);
+
+            logger.info("{} restored {} Records and {} Swap Files from 
Snapshot, ending with Transaction ID {}",
+                new Object[] {this, numRecords, swapLocations.size(), 
maxTransactionId});
+
+            return new StandardSnapshotRecovery<>(recordMap, swapLocations, 
snapshotFile, maxTransactionId);
+        }
+    }
+
+    @Override
+    public void update(final Collection<T> records) {
+        // This implementation of Snapshot keeps a ConcurrentHashMap of all 
'active' records
+        // (meaning records that have not been removed and are not swapped 
out), keyed by the
+        // Record Identifier. It keeps only the most up-to-date version of the 
Record. This allows
+        // us to write the snapshot very quickly without having to re-process 
the journal files.
+        // For each update, then, we will update the record in the map.
+        for (final T record : records) {
+            final Object recordId = serdeFactory.getRecordIdentifier(record);
+            final UpdateType updateType = serdeFactory.getUpdateType(record);
+
+            switch (updateType) {
+                case DELETE:
+                    recordMap.remove(recordId);
+                    break;
+                case SWAP_OUT:
+                    final String location = serdeFactory.getLocation(record);
+                    if (location == null) {
+                        logger.error("Received Record (ID=" + recordId + ") 
with UpdateType of SWAP_OUT but "
+                            + "no indicator of where the Record is to be 
Swapped Out to; these records may be "
+                            + "lost when the repository is restored!");
+                    } else {
+                        recordMap.remove(recordId);
+                        this.swapLocations.add(location);
+                    }
+                    break;
+                case SWAP_IN:
+                    final String swapLocation = 
serdeFactory.getLocation(record);
+                    if (swapLocation == null) {
+                        logger.error("Received Record (ID=" + recordId + ") 
with UpdateType of SWAP_IN but no "
+                            + "indicator of where the Record is to be Swapped 
In from; these records may be duplicated "
+                            + "when the repository is restored!");
+                    } else {
+                        swapLocations.remove(swapLocation);
+                    }
+                    recordMap.put(recordId, record);
+                    break;
+                default:
+                    recordMap.put(recordId, record);
+                    break;
+            }
+        }
+    }
+
+    @Override
+    public int getRecordCount() {
+        return recordMap.size();
+    }
+
+    @Override
+    public T lookup(final Object recordId) {
+        return recordMap.get(recordId);
+    }
+
+
+    @Override
+    public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId) {
+        return new Snapshot(new HashMap<>(recordMap), new 
HashSet<>(swapLocations), maxTransactionId);
+    }
+
+    private int getVersion() {
+        return ENCODING_VERSION;
+    }
+
+    private File getPartialFile() {
+        return new File(storageDirectory, "checkpoint.partial");
+    }
+
+    private File getSnapshotFile() {
+        return new File(storageDirectory, "checkpoint");
+    }
+
+    @Override
+    public synchronized void writeSnapshot(final SnapshotCapture<T> snapshot) 
throws IOException {
+        final SerDe<T> serde = serdeFactory.createSerDe(null);
+
+        final File snapshotFile = getSnapshotFile();
+        final File partialFile = getPartialFile();
+
+        // We must ensure that we do not overwrite the existing Snapshot file 
directly because if NiFi were
+        // to be killed or crash when we are partially finished, we'd end up 
with no viable Snapshot file at all.
+        // To avoid this, we write to a 'partial' file, then delete the 
existing Snapshot file, if it exists, and
+        // rename 'partial' to Snaphsot. That way, if NiFi crashes, we can 
still restore the Snapshot by first looking
+        // for a Snapshot file and restoring it, if it exists. If it does not 
exist, then we restore from the partial file,
+        // assuming that NiFi crashed after deleting the Snapshot file and 
before renaming the partial file.
+        //
+        // If there is no Snapshot file currently but there is a Partial File, 
then this indicates
+        // that we have deleted the Snapshot file and failed to rename the 
Partial File. We don't want
+        // to overwrite the Partial file, because doing so could potentially 
lose data. Instead, we must
+        // first rename it to Snapshot and then write to the partial file.
+        if (!snapshotFile.exists() && partialFile.exists()) {
+            final boolean rename = partialFile.renameTo(snapshotFile);
+            if (!rename) {
+                throw new IOException("Failed to rename partial snapshot file 
" + partialFile + " to " + snapshotFile);
+            }
+        }
+
+        // Write to the partial file.
+        try (final FileOutputStream fileOut = new 
FileOutputStream(getPartialFile());
+            final OutputStream bufferedOut = new BufferedOutputStream(fileOut);
+            final DataOutputStream dataOut = new 
DataOutputStream(bufferedOut)) {
+
+            // Write out the header
+            dataOut.writeUTF(HashMapSnapshot.class.getName());
+            dataOut.writeInt(getVersion());
+            dataOut.writeUTF(serde.getClass().getName());
+            dataOut.writeInt(serde.getVersion());
+            dataOut.writeLong(snapshot.getMaxTransactionId());
+            dataOut.writeInt(snapshot.getRecords().size());
+            serde.writeHeader(dataOut);
+
+            // Serialize each record
+            for (final T record : snapshot.getRecords().values()) {
+                logger.trace("Checkpointing {}", record);
+                serde.serializeRecord(record, dataOut);
+            }
+
+            // Write out the number of swap locations, followed by the swap 
locations themselves.
+            dataOut.writeInt(snapshot.getSwapLocations().size());
+            for (final String swapLocation : snapshot.getSwapLocations()) {
+                dataOut.writeUTF(swapLocation);
+            }
+
+            // Ensure that we flush the Buffered Output Stream and then 
perform an fsync().
+            // This ensures that the data is fully written to disk before we 
delete the existing snapshot.
+            dataOut.flush();
+            fileOut.getChannel().force(false);
+        }
+
+        // If the snapshot file exists, delete it
+        if (snapshotFile.exists()) {
+            if (!snapshotFile.delete()) {
+                logger.warn("Unable to delete existing Snapshot file " + 
snapshotFile);
+            }
+        }
+
+        // Rename the partial file to Snapshot.
+        final boolean rename = partialFile.renameTo(snapshotFile);
+        if (!rename) {
+            throw new IOException("Failed to rename partial snapshot file " + 
partialFile + " to " + snapshotFile);
+        }
+    }
+
+
+    public class Snapshot implements SnapshotCapture<T> {
+        private final Map<Object, T> records;
+        private final long maxTransactionId;
+        private final Set<String> swapLocations;
+
+        public Snapshot(final Map<Object, T> records, final Set<String> 
swapLocations, final long maxTransactionId) {
+            this.records = records;
+            this.swapLocations = swapLocations;
+            this.maxTransactionId = maxTransactionId;
+        }
+
+        @Override
+        public final Map<Object, T> getRecords() {
+            return records;
+        }
+
+        @Override
+        public long getMaxTransactionId() {
+            return maxTransactionId;
+        }
+
+        @Override
+        public Set<String> getSwapLocations() {
+            return swapLocations;
+        }
+    }
+
+    private class SnapshotHeader {
+        private final SerDe<T> serde;
+        private final int serdeVersion;
+        private final int numRecords;
+        private final long maxTransactionId;
+
+        public SnapshotHeader(final SerDe<T> serde, final int serdeVersion, 
final long maxTransactionId, final int numRecords) {
+            this.serde = serde;
+            this.serdeVersion = serdeVersion;
+            this.maxTransactionId = maxTransactionId;
+            this.numRecords = numRecords;
+        }
+
+        public SerDe<T> getSerDe() {
+            return serde;
+        }
+
+        public int getSerDeVersion() {
+            return serdeVersion;
+        }
+
+        public long getMaxTransactionId() {
+            return maxTransactionId;
+        }
+
+        public int getNumRecords() {
+            return numRecords;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalRecovery.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalRecovery.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalRecovery.java
new file mode 100644
index 0000000..2832396
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalRecovery.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+public interface JournalRecovery {
+
+    int getUpdateCount();
+
+    long getMaxTransactionId();
+
+    boolean isEOFExceptionEncountered();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalSummary.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalSummary.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalSummary.java
new file mode 100644
index 0000000..ee21036
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalSummary.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+public interface JournalSummary {
+    /**
+     * @return the Transaction ID of the first transaction written
+     */
+    long getFirstTransactionId();
+
+    /**
+     * @return the Transaction ID of the last transaction written
+     */
+    long getLastTransactionId();
+
+    /**
+     * @return the number of transactions that were written to the journal
+     */
+    int getTransactionCount();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
new file mode 100644
index 0000000..0b2a8d3
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.text.DecimalFormat;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.SerDeFactory;
+import org.wali.UpdateType;
+
+public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
+    private static final Logger logger = 
LoggerFactory.getLogger(LengthDelimitedJournal.class);
+    private static final JournalSummary INACTIVE_JOURNAL_SUMMARY = new 
StandardJournalSummary(-1L, -1L, 0);
+    private static final int JOURNAL_ENCODING_VERSION = 1;
+    private static final byte TRANSACTION_FOLLOWS = 64;
+    private static final byte JOURNAL_COMPLETE = 127;
+    private static final int NUL_BYTE = 0;
+
+    private final File journalFile;
+    private final long initialTransactionId;
+    private final SerDeFactory<T> serdeFactory;
+    private final ObjectPool<ByteArrayDataOutputStream> streamPool;
+
+    private SerDe<T> serde;
+    private FileOutputStream fileOut;
+    private BufferedOutputStream bufferedOut;
+
+    private long currentTransactionId;
+    private int transactionCount;
+    private boolean headerWritten = false;
+
+    private volatile boolean poisoned = false;
+    private volatile boolean closed = false;
+    private final ByteBuffer transactionPreamble = ByteBuffer.allocate(12); // 
guarded by synchronized block
+
+    public LengthDelimitedJournal(final File journalFile, final 
SerDeFactory<T> serdeFactory, final ObjectPool<ByteArrayDataOutputStream> 
streamPool, final long initialTransactionId) {
+        this.journalFile = journalFile;
+        this.serdeFactory = serdeFactory;
+        this.serde = serdeFactory.createSerDe(null);
+        this.streamPool = streamPool;
+
+        this.initialTransactionId = initialTransactionId;
+        this.currentTransactionId = initialTransactionId;
+    }
+
+    private synchronized OutputStream getOutputStream() throws 
FileNotFoundException {
+        if (fileOut == null) {
+            fileOut = new FileOutputStream(journalFile);
+            bufferedOut = new BufferedOutputStream(fileOut);
+        }
+
+        return bufferedOut;
+    }
+
+
+    @Override
+    public synchronized void writeHeader() throws IOException {
+        try {
+            final DataOutputStream outStream = new 
DataOutputStream(getOutputStream());
+            outStream.writeUTF(LengthDelimitedJournal.class.getName());
+            outStream.writeInt(JOURNAL_ENCODING_VERSION);
+
+            serde = serdeFactory.createSerDe(null);
+            outStream.writeUTF(serde.getClass().getName());
+            outStream.writeInt(serde.getVersion());
+
+            try (final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                final DataOutputStream dos = new DataOutputStream(baos)) {
+
+                serde.writeHeader(dos);
+                dos.flush();
+
+                final int serdeHeaderLength = baos.size();
+                outStream.writeInt(serdeHeaderLength);
+                baos.writeTo(outStream);
+            }
+
+            outStream.flush();
+        } catch (final Throwable t) {
+            poison(t);
+
+            final IOException ioe = (t instanceof IOException) ? (IOException) 
t : new IOException("Failed to create journal file " + journalFile, t);
+            logger.error("Failed to create new journal file {} due to {}", 
journalFile, ioe.toString(), ioe);
+            throw ioe;
+        }
+
+        headerWritten = true;
+    }
+
+    private synchronized SerDeAndVersion validateHeader(final DataInputStream 
in) throws IOException {
+        final String journalClassName = in.readUTF();
+        logger.debug("Write Ahead Log Class Name for {} is {}", journalFile, 
journalClassName);
+        if (!LengthDelimitedJournal.class.getName().equals(journalClassName)) {
+            throw new IOException("Invalid header information - " + 
journalFile + " does not appear to be a valid journal file.");
+        }
+
+        final int encodingVersion = in.readInt();
+        logger.debug("Encoding version for {} is {}", journalFile, 
encodingVersion);
+        if (encodingVersion > JOURNAL_ENCODING_VERSION) {
+            throw new IOException("Cannot read journal file " + journalFile + 
" because it is encoded using veresion " + encodingVersion
+                + " but this version of the code only understands version " + 
JOURNAL_ENCODING_VERSION + " and below");
+        }
+
+        final String serdeClassName = in.readUTF();
+        logger.debug("Serde Class Name for {} is {}", journalFile, 
serdeClassName);
+        final SerDe<T> serde;
+        try {
+            serde = serdeFactory.createSerDe(serdeClassName);
+        } catch (final IllegalArgumentException iae) {
+            throw new IOException("Cannot read journal file " + journalFile + 
" because the serializer/deserializer used was " + serdeClassName
+                + " but this repository is configured to use a different type 
of serializer/deserializer");
+        }
+
+        final int serdeVersion = in.readInt();
+        logger.debug("Serde version is {}", serdeVersion);
+        if (serdeVersion > serde.getVersion()) {
+            throw new IOException("Cannot read journal file " + journalFile + 
" because it is encoded using veresion " + encodingVersion
+                + " of the serializer/deserializer but this version of the 
code only understands version " + serde.getVersion() + " and below");
+        }
+
+        final int serdeHeaderLength = in.readInt();
+        final InputStream serdeHeaderIn = new LimitingInputStream(in, 
serdeHeaderLength);
+        final DataInputStream dis = new DataInputStream(serdeHeaderIn);
+        serde.readHeader(dis);
+
+        return new SerDeAndVersion(serde, serdeVersion);
+    }
+
+
+    @Override
+    public void update(final Collection<T> records, final RecordLookup<T> 
recordLookup) throws IOException {
+        if (!headerWritten) {
+            throw new IllegalStateException("Cannot update journal file " + 
journalFile + " because no header has been written yet.");
+        }
+
+        if (records.isEmpty()) {
+            return;
+        }
+
+        checkState();
+
+        final ByteArrayDataOutputStream bados = streamPool.borrowObject();
+        try {
+            for (final T record : records) {
+                final Object recordId = serde.getRecordIdentifier(record);
+                final T previousRecordState = recordLookup.lookup(recordId);
+                serde.serializeEdit(previousRecordState, record, 
bados.getDataOutputStream());
+            }
+
+            final ByteArrayOutputStream baos = 
bados.getByteArrayOutputStream();
+            final OutputStream out = getOutputStream();
+
+            final long transactionId;
+            synchronized (this) {
+                transactionId = currentTransactionId++;
+                transactionCount++;
+
+                transactionPreamble.clear();
+                transactionPreamble.putLong(transactionId);
+                transactionPreamble.putInt(baos.size());
+
+                out.write(TRANSACTION_FOLLOWS);
+                out.write(transactionPreamble.array());
+                baos.writeTo(out);
+                out.flush();
+            }
+
+            logger.debug("Wrote Transaction {} to journal {} with length {} 
and {} records", transactionId, journalFile, baos.size(), records.size());
+        } catch (final Throwable t) {
+            poison(t);
+            throw t;
+        } finally {
+            streamPool.returnObject(bados);
+        }
+    }
+
+    private void checkState() throws IOException {
+        if (poisoned) {
+            throw new IOException("Cannot update journal file " + journalFile 
+ " because this journal has already encountered a failure when attempting to 
write to the file. "
+                + "If the repository is able to checkpoint, then this problem 
will resolve itself. However, if the repository is unable to be checkpointed "
+                + "(for example, due to being out of storage space or having 
too many open files), then this issue may require manual intervention.");
+        }
+
+        if (closed) {
+            throw new IOException("Cannot update journal file " + journalFile 
+ " because this journal has already been closed");
+        }
+    }
+
+    private void poison(final Throwable t) {
+        this.poisoned = true;
+
+        try {
+            if (fileOut != null) {
+                fileOut.close();
+            }
+
+            closed = true;
+        } catch (final IOException innerIOE) {
+            t.addSuppressed(innerIOE);
+        }
+    }
+
+    @Override
+    public synchronized void fsync() throws IOException {
+        checkState();
+
+        try {
+            if (fileOut != null) {
+                fileOut.getChannel().force(false);
+            }
+        } catch (final IOException ioe) {
+            poison(ioe);
+        }
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        if (closed) {
+            return;
+        }
+
+        closed = true;
+
+        try {
+            if (fileOut != null) {
+                if (!poisoned) {
+                    fileOut.write(JOURNAL_COMPLETE);
+                }
+
+                fileOut.close();
+            }
+        } catch (final IOException ioe) {
+            poison(ioe);
+        }
+    }
+
+    @Override
+    public JournalRecovery recoverRecords(final Map<Object, T> recordMap, 
final Set<String> swapLocations) throws IOException {
+        long maxTransactionId = -1L;
+        int updateCount = 0;
+
+        boolean eofException = false;
+        logger.info("Recovering records from journal {}", journalFile);
+        final double journalLength = journalFile.length();
+
+        try (final InputStream fis = new FileInputStream(journalFile);
+            final InputStream bufferedIn = new BufferedInputStream(fis);
+            final ByteCountingInputStream byteCountingIn = new 
ByteCountingInputStream(bufferedIn);
+            final DataInputStream in = new DataInputStream(byteCountingIn)) {
+
+            try {
+                // Validate that the header is what we expect and obtain the 
appropriate SerDe and Version information
+                final SerDeAndVersion serdeAndVersion = validateHeader(in);
+                final SerDe<T> serde = serdeAndVersion.getSerDe();
+
+                // Ensure that we get a valid transaction indicator
+                int transactionIndicator = in.read();
+                if (transactionIndicator != TRANSACTION_FOLLOWS && 
transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) {
+                    throw new IOException("After reading " + 
byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", 
encountered unexpected value of "
+                        + transactionIndicator + " for the Transaction 
Indicator. This journal may have been corrupted.");
+                }
+
+                long consumedAtLog = 0L;
+
+                // We don't want to apply the updates in a transaction until 
we've finished recovering the entire
+                // transaction. Otherwise, we could apply say 8 out of 10 
updates and then hit an EOF. In such a case,
+                // we want to rollback the entire transaction. We handle this 
by not updating recordMap or swapLocations
+                // variables directly but instead keeping track of the things 
that occurred and then once we've read the
+                // entire transaction, we can apply those updates to the 
recordMap and swapLocations.
+                final Map<Object, T> transactionRecordMap = new HashMap<>();
+                final Set<Object> idsRemoved = new HashSet<>();
+                final Set<String> swapLocationsRemoved = new HashSet<>();
+                final Set<String> swapLocationsAdded = new HashSet<>();
+                int transactionUpdates = 0;
+
+                // While we have a transaction to recover, recover it
+                while (transactionIndicator == TRANSACTION_FOLLOWS) {
+                    transactionRecordMap.clear();
+                    idsRemoved.clear();
+                    swapLocationsRemoved.clear();
+                    swapLocationsAdded.clear();
+                    transactionUpdates = 0;
+
+                    // Format is <Transaction ID: 8 bytes> <Transaction 
Length: 4 bytes> <Transaction data: # of bytes indicated by Transaction Length 
Field>
+                    final long transactionId = in.readLong();
+                    maxTransactionId = Math.max(maxTransactionId, 
transactionId);
+                    final int transactionLength = in.readInt();
+
+                    // Use SerDe to deserialize the update. We use a 
LimitingInputStream to ensure that the SerDe is not able to read past its 
intended
+                    // length, in case there is a bug in the SerDe. We then 
use a ByteCountingInputStream so that we can ensure that all of the data has
+                    // been read and throw EOFException otherwise.
+                    final InputStream transactionLimitingIn = new 
LimitingInputStream(in, transactionLength);
+                    final ByteCountingInputStream transactionByteCountingIn = 
new ByteCountingInputStream(transactionLimitingIn);
+                    final DataInputStream transactionDis = new 
DataInputStream(transactionByteCountingIn);
+
+                    while (transactionByteCountingIn.getBytesConsumed() < 
transactionLength) {
+                        final T record = serde.deserializeEdit(transactionDis, 
recordMap, serdeAndVersion.getVersion());
+
+                        // Update our RecordMap so that we have the most 
up-to-date version of the Record.
+                        final Object recordId = 
serde.getRecordIdentifier(record);
+                        final UpdateType updateType = 
serde.getUpdateType(record);
+
+                        switch (updateType) {
+                            case DELETE: {
+                                idsRemoved.add(recordId);
+                                transactionRecordMap.remove(recordId);
+                                break;
+                            }
+                            case SWAP_IN: {
+                                final String location = 
serde.getLocation(record);
+                                if (location == null) {
+                                    logger.error("Recovered SWAP_IN record 
from edit log, but it did not contain a Location; skipping record");
+                                } else {
+                                    swapLocationsRemoved.add(location);
+                                    swapLocationsAdded.remove(location);
+                                    transactionRecordMap.put(recordId, record);
+                                }
+                                break;
+                            }
+                            case SWAP_OUT: {
+                                final String location = 
serde.getLocation(record);
+                                if (location == null) {
+                                    logger.error("Recovered SWAP_OUT record 
from edit log, but it did not contain a Location; skipping record");
+                                } else {
+                                    swapLocationsRemoved.remove(location);
+                                    swapLocationsAdded.add(location);
+                                    idsRemoved.add(recordId);
+                                    transactionRecordMap.remove(recordId);
+                                }
+
+                                break;
+                            }
+                            default: {
+                                transactionRecordMap.put(recordId, record);
+                                idsRemoved.remove(recordId);
+                                break;
+                            }
+                        }
+
+                        transactionUpdates++;
+                    }
+
+                    // Apply the transaction
+                    for (final Object id : idsRemoved) {
+                        recordMap.remove(id);
+                    }
+                    recordMap.putAll(transactionRecordMap);
+                    swapLocations.removeAll(swapLocationsRemoved);
+                    swapLocations.addAll(swapLocationsAdded);
+                    updateCount += transactionUpdates;
+
+                    // Check if there is another transaction to read
+                    transactionIndicator = in.read();
+                    if (transactionIndicator != TRANSACTION_FOLLOWS && 
transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) {
+                        throw new IOException("After reading " + 
byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", 
encountered unexpected value of "
+                            + transactionIndicator + " for the Transaction 
Indicator. This journal may have been corrupted.");
+                    }
+
+                    // If we have a very large journal (for instance, if 
checkpoint is not called for a long time, or if there is a problem rolling over
+                    // the journal), then we want to occasionally notify the 
user that we are, in fact, making progress, so that it doesn't appear that
+                    // NiFi has become "stuck".
+                    final long consumed = byteCountingIn.getBytesConsumed();
+                    if (consumed - consumedAtLog > 50_000_000) {
+                        final double percentage = consumed / journalLength * 
100D;
+                        final String pct = new 
DecimalFormat("#.00").format(percentage);
+                        logger.info("{}% of the way finished recovering 
journal {}, having recovered {} updates", pct, journalFile, updateCount);
+                        consumedAtLog = consumed;
+                    }
+                }
+            } catch (final EOFException eof) {
+                eofException = true;
+                logger.warn("Encountered unexpected End-of-File when reading 
journal file {}; assuming that NiFi was shutdown unexpectedly and continuing 
recovery", journalFile);
+            } catch (final Exception e) {
+                // If the stream consists solely of NUL bytes, then we want to 
treat it
+                // the same as an EOF because we see this happen when we 
suddenly lose power
+                // while writing to a file. However, if that is not the case, 
then something else has gone wrong.
+                // In such a case, there is not much that we can do but to 
re-throw the Exception.
+                if (remainingBytesAllNul(in)) {
+                    logger.warn("Failed to recover some of the data from 
Write-Ahead Log Journal because encountered trailing NUL bytes. "
+                        + "This will sometimes happen after a sudden power 
loss. The rest of this journal file will be skipped for recovery purposes."
+                        + "The following Exception was encountered while 
recovering the updates to the journal:", e);
+                } else {
+                    throw e;
+                }
+            }
+        }
+
+        logger.info("Successfully recovered {} updates from journal {}", 
updateCount, journalFile);
+        return new StandardJournalRecovery(updateCount, maxTransactionId, 
eofException);
+    }
+
+    /**
+     * In the case of a sudden power loss, it is common - at least in a Linux 
journaling File System -
+     * that the partition file that is being written to will have many 
trailing "NUL bytes" (0's).
+     * If this happens, then on restart we want to treat this as an incomplete 
transaction, so we detect
+     * this case explicitly.
+     *
+     * @param in the input stream to scan
+     * @return <code>true</code> if the InputStream contains no data or 
contains only NUL bytes
+     * @throws IOException if unable to read from the given InputStream
+     */
+    private boolean remainingBytesAllNul(final InputStream in) throws 
IOException {
+        int nextByte;
+        while ((nextByte = in.read()) != -1) {
+            if (nextByte != NUL_BYTE) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public synchronized JournalSummary getSummary() {
+        if (transactionCount < 1) {
+            return INACTIVE_JOURNAL_SUMMARY;
+        }
+
+        return new StandardJournalSummary(initialTransactionId, 
currentTransactionId - 1, transactionCount);
+    }
+
+    private class SerDeAndVersion {
+        private final SerDe<T> serde;
+        private final int version;
+
+        public SerDeAndVersion(final SerDe<T> serde, final int version) {
+            this.serde = serde;
+            this.version = version;
+        }
+
+        public SerDe<T> getSerDe() {
+            return serde;
+        }
+
+        public int getVersion() {
+            return version;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ObjectPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ObjectPool.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ObjectPool.java
new file mode 100644
index 0000000..16f1a36
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ObjectPool.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+public interface ObjectPool<T> {
+
+    T borrowObject();
+
+    void returnObject(T somethingBorrowed);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/RecordLookup.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/RecordLookup.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/RecordLookup.java
new file mode 100644
index 0000000..4c7b834
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/RecordLookup.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+public interface RecordLookup<T> {
+
+    /**
+     * Returns the Record with the given identifier, or <code>null</code> if 
no such record exists
+     *
+     * @param identifier the identifier of the record to lookup
+     * @return the Record with the given identifier, or <code>null</code> if 
no such record exists
+     */
+    T lookup(Object identifier);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
new file mode 100644
index 0000000..e4a1db7
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDeFactory;
+import org.wali.SyncListener;
+import org.wali.WriteAheadRepository;
+
+/**
+ * <p>
+ * This implementation of WriteAheadRepository provides the ability to write 
all updates to the
+ * repository sequentially by writing to a single journal file. Serialization 
of data into bytes
+ * happens outside of any lock contention and is done so using recycled byte 
buffers. As such,
+ * we occur minimal garbage collection and the theoretical throughput of this 
repository is equal
+ * to the throughput of the underlying disk itself.
+ * </p>
+ *
+ * <p>
+ * This implementation makes the assumption that only a single thread will 
ever issue updates for
+ * a given Record at any one time. I.e., the implementation is thread-safe but 
cannot guarantee
+ * that records are recovered correctly if two threads simultaneously update 
the write-ahead log
+ * with updates for the same record.
+ * </p>
+ */
+public class SequentialAccessWriteAheadLog<T> implements 
WriteAheadRepository<T> {
+    private static final int PARTITION_INDEX = 0;
+    private static final Logger logger = 
LoggerFactory.getLogger(SequentialAccessWriteAheadLog.class);
+    private static final Pattern JOURNAL_FILENAME_PATTERN = 
Pattern.compile("\\d+\\.journal");
+    private static final int MAX_BUFFERS = 64;
+    private static final int BUFFER_SIZE = 256 * 1024;
+
+    private final File storageDirectory;
+    private final File journalsDirectory;
+    private final SerDeFactory<T> serdeFactory;
+    private final SyncListener syncListener;
+
+    private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock();
+    private final Lock journalReadLock = journalRWLock.readLock();
+    private final Lock journalWriteLock = journalRWLock.writeLock();
+    private final ObjectPool<ByteArrayDataOutputStream> streamPool = new 
BlockingQueuePool<>(MAX_BUFFERS,
+        () -> new ByteArrayDataOutputStream(BUFFER_SIZE),
+        stream -> stream.getByteArrayOutputStream().size() < BUFFER_SIZE,
+        stream -> stream.getByteArrayOutputStream().reset());
+
+    private final WriteAheadSnapshot<T> snapshot;
+    private final RecordLookup<T> recordLookup;
+    private SnapshotRecovery<T> snapshotRecovery;
+
+    private volatile boolean recovered = false;
+    private WriteAheadJournal<T> journal;
+    private volatile long nextTransactionId = 0L;
+
+    public SequentialAccessWriteAheadLog(final File storageDirectory, final 
SerDeFactory<T> serdeFactory) throws IOException {
+        this(storageDirectory, serdeFactory, SyncListener.NOP_SYNC_LISTENER);
+    }
+
+    public SequentialAccessWriteAheadLog(final File storageDirectory, final 
SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws 
IOException {
+        if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
+            throw new IOException("Directory " + storageDirectory + " does not 
exist and cannot be created");
+        }
+        if (!storageDirectory.isDirectory()) {
+            throw new IOException("File " + storageDirectory + " is a regular 
file and not a directory");
+        }
+
+        final HashMapSnapshot<T> hashMapSnapshot = new 
HashMapSnapshot<>(storageDirectory, serdeFactory);
+        this.snapshot = hashMapSnapshot;
+        this.recordLookup = hashMapSnapshot;
+
+        this.storageDirectory = storageDirectory;
+        this.journalsDirectory = new File(storageDirectory, "journals");
+        if (!journalsDirectory.exists() && !journalsDirectory.mkdirs()) {
+            throw new IOException("Directory " + journalsDirectory + " does 
not exist and cannot be created");
+        }
+
+        recovered = false;
+
+        this.serdeFactory = serdeFactory;
+        this.syncListener = (syncListener == null) ? 
SyncListener.NOP_SYNC_LISTENER : syncListener;
+    }
+
+    @Override
+    public int update(final Collection<T> records, final boolean forceSync) 
throws IOException {
+        if (!recovered) {
+            throw new IllegalStateException("Cannot update repository until 
record recovery has been performed");
+        }
+
+        journalReadLock.lock();
+        try {
+            journal.update(records, recordLookup);
+
+            if (forceSync) {
+                journal.fsync();
+                syncListener.onSync(PARTITION_INDEX);
+            }
+
+            snapshot.update(records);
+        } finally {
+            journalReadLock.unlock();
+        }
+
+        return PARTITION_INDEX;
+    }
+
+    @Override
+    public synchronized Collection<T> recoverRecords() throws IOException {
+        if (recovered) {
+            throw new IllegalStateException("Cannot recover records from 
repository because record recovery has already commenced");
+        }
+
+        logger.info("Recovering records from Write-Ahead Log at {}", 
storageDirectory);
+
+        final long recoverStart = System.nanoTime();
+        recovered = true;
+        snapshotRecovery = snapshot.recover();
+
+        final long snapshotRecoveryMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart);
+
+        final Map<Object, T> recoveredRecords = snapshotRecovery.getRecords();
+        final Set<String> swapLocations = 
snapshotRecovery.getRecoveredSwapLocations();
+
+        final File[] journalFiles = 
journalsDirectory.listFiles(this::isJournalFile);
+        if (journalFiles == null) {
+            throw new IOException("Cannot access the list of files in 
directory " + journalsDirectory + "; please ensure that appropriate file 
permissions are set.");
+        }
+
+        if (snapshotRecovery.getRecoveryFile() == null) {
+            logger.info("No Snapshot File to recover from at {}. Now 
recovering records from {} journal files", storageDirectory, 
journalFiles.length);
+        } else {
+            logger.info("Successfully recovered {} records and {} swap files 
from Snapshot at {} with Max Transaction ID of {} in {} milliseconds. Now 
recovering records from {} journal files",
+                recoveredRecords.size(), swapLocations.size(), 
snapshotRecovery.getRecoveryFile(), snapshotRecovery.getMaxTransactionId(),
+                snapshotRecoveryMillis, journalFiles.length);
+        }
+
+        final List<File> orderedJournalFiles = Arrays.asList(journalFiles);
+        Collections.sort(orderedJournalFiles, new Comparator<File>() {
+            @Override
+            public int compare(final File o1, final File o2) {
+                final long transactionId1 = getMinTransactionId(o1);
+                final long transactionId2 = getMinTransactionId(o2);
+
+                return Long.compare(transactionId1, transactionId2);
+            }
+        });
+
+        final long snapshotTransactionId = 
snapshotRecovery.getMaxTransactionId();
+
+        int totalUpdates = 0;
+        int journalFilesRecovered = 0;
+        int journalFilesSkipped = 0;
+        long maxTransactionId = snapshotTransactionId;
+
+        for (final File journalFile : orderedJournalFiles) {
+            final long journalMinTransactionId = 
getMinTransactionId(journalFile);
+            if (journalMinTransactionId < snapshotTransactionId) {
+                logger.debug("Will not recover records from journal file {} 
because the minimum Transaction ID for that journal is {} and the Transaction 
ID recovered from Snapshot was {}",
+                    journalFile, journalMinTransactionId, 
snapshotTransactionId);
+
+                journalFilesSkipped++;
+                continue;
+            }
+
+            logger.debug("Min Transaction ID for journal {} is {}, so will 
recover records from journal", journalFile, journalMinTransactionId);
+            journalFilesRecovered++;
+
+            try (final WriteAheadJournal<T> journal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
+                final JournalRecovery journalRecovery = 
journal.recoverRecords(recoveredRecords, swapLocations);
+                final int updates = journalRecovery.getUpdateCount();
+
+                logger.debug("Recovered {} updates from journal {}", updates, 
journalFile);
+                totalUpdates += updates;
+                maxTransactionId = Math.max(maxTransactionId, 
journalRecovery.getMaxTransactionId());
+            }
+        }
+
+        logger.debug("Recovered {} updates from {} journal files and skipped 
{} journal files because their data was already encapsulated in the snapshot",
+            totalUpdates, journalFilesRecovered, journalFilesSkipped);
+        this.nextTransactionId = maxTransactionId + 1;
+
+        final long recoverNanos = System.nanoTime() - recoverStart;
+        final long recoveryMillis = 
TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS);
+        logger.info("Successfully recovered {} records in {} milliseconds. Now 
checkpointing to ensure that Write-Ahead Log is in a consistent state", 
recoveredRecords.size(), recoveryMillis);
+
+        checkpoint();
+
+        return recoveredRecords.values();
+    }
+
+    private long getMinTransactionId(final File journalFile) {
+        final String filename = journalFile.getName();
+        final String numeral = filename.substring(0, filename.indexOf("."));
+        return Long.parseLong(numeral);
+    }
+
+    private boolean isJournalFile(final File file) {
+        if (!file.isFile()) {
+            return false;
+        }
+
+        final String filename = file.getName();
+        return JOURNAL_FILENAME_PATTERN.matcher(filename).matches();
+    }
+
+    @Override
+    public synchronized Set<String> getRecoveredSwapLocations() throws 
IOException {
+        if (!recovered) {
+            throw new IllegalStateException("Cannot retrieve the Recovered 
Swap Locations until record recovery has been performed");
+        }
+
+        return snapshotRecovery.getRecoveredSwapLocations();
+    }
+
+    @Override
+    public int checkpoint() throws IOException {
+        final SnapshotCapture<T> snapshotCapture;
+
+        final long startNanos = System.nanoTime();
+        final File[] existingJournals;
+        journalWriteLock.lock();
+        try {
+            if (journal != null) {
+                final JournalSummary journalSummary = journal.getSummary();
+                if (journalSummary.getTransactionCount() == 0) {
+                    logger.debug("Will not checkpoint Write-Ahead Log because 
no updates have occurred since last checkpoint");
+                    return snapshot.getRecordCount();
+                }
+
+                journal.fsync();
+                journal.close();
+
+                nextTransactionId = Math.max(nextTransactionId, 
journalSummary.getLastTransactionId() + 1);
+            }
+
+            syncListener.onGlobalSync();
+
+            final File[] existingFiles = 
journalsDirectory.listFiles(this::isJournalFile);
+            existingJournals = (existingFiles == null) ? new File[0] : 
existingFiles;
+
+            snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1);
+
+            // Create a new journal. We name the journal file <next 
transaction id>.journal but it is possible
+            // that we could have an empty journal file already created. If 
this happens, we don't want to create
+            // a new file on top of it because it would get deleted below when 
we clean up old journals. So we
+            // will simply increment our transaction ID and try again.
+            File journalFile = new File(journalsDirectory, 
String.valueOf(nextTransactionId) + ".journal");
+            while (journalFile.exists()) {
+                nextTransactionId++;
+                journalFile = new File(journalsDirectory, 
String.valueOf(nextTransactionId) + ".journal");
+            }
+
+            journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, 
streamPool, nextTransactionId);
+            journal.writeHeader();
+
+            logger.debug("Created new Journal starting with Transaction ID 
{}", nextTransactionId);
+        } finally {
+            journalWriteLock.unlock();
+        }
+
+        final long stopTheWorldMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        snapshot.writeSnapshot(snapshotCapture);
+
+        for (final File existingJournal : existingJournals) {
+            logger.debug("Deleting Journal {} because it is now encapsulated 
in the latest Snapshot", existingJournal.getName());
+            if (!existingJournal.delete() && existingJournal.exists()) {
+                logger.warn("Unable to delete expired journal file " + 
existingJournal + "; this file should be deleted manually.");
+            }
+        }
+
+        final long totalNanos = System.nanoTime() - startNanos;
+        final long millis = TimeUnit.NANOSECONDS.toMillis(totalNanos);
+        logger.info("Checkpointed Write-Ahead Log with {} Records and {} Swap 
Files in {} milliseconds (Stop-the-world time = {} milliseconds), max 
Transaction ID {}",
+            new Object[] {snapshotCapture.getRecords().size(), 
snapshotCapture.getSwapLocations().size(), millis, stopTheWorldMillis, 
snapshotCapture.getMaxTransactionId()});
+
+        return snapshotCapture.getRecords().size();
+    }
+
+
+    @Override
+    public void shutdown() throws IOException {
+        journalWriteLock.lock();
+        try {
+            if (journal != null) {
+                journal.close();
+            }
+        } finally {
+            journalWriteLock.unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotCapture.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotCapture.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotCapture.java
new file mode 100644
index 0000000..5a24258
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotCapture.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface SnapshotCapture<T> {
+    Map<Object, T> getRecords();
+
+    long getMaxTransactionId();
+
+    Set<String> getSwapLocations();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotRecovery.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotRecovery.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotRecovery.java
new file mode 100644
index 0000000..9b6339a
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotRecovery.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+public interface SnapshotRecovery<T> {
+    long getMaxTransactionId();
+
+    Map<Object, T> getRecords();
+
+    Set<String> getRecoveredSwapLocations();
+
+    File getRecoveryFile();
+
+
+    public static <T> SnapshotRecovery<T> emptyRecovery() {
+        return new SnapshotRecovery<T>() {
+            @Override
+            public long getMaxTransactionId() {
+                return -1L;
+            }
+
+            @Override
+            public Map<Object, T> getRecords() {
+                return Collections.emptyMap();
+            }
+
+            @Override
+            public Set<String> getRecoveredSwapLocations() {
+                return Collections.emptySet();
+            }
+
+            @Override
+            public File getRecoveryFile() {
+                return null;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalRecovery.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalRecovery.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalRecovery.java
new file mode 100644
index 0000000..459e777
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalRecovery.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+public class StandardJournalRecovery implements JournalRecovery {
+    private final int updateCount;
+    private final long maxTransactionId;
+    private final boolean eofException;
+
+    public StandardJournalRecovery(final int updateCount, final long 
maxTransactionId, final boolean eofException) {
+        this.updateCount = updateCount;
+        this.maxTransactionId = maxTransactionId;
+        this.eofException = eofException;
+    }
+
+    @Override
+    public int getUpdateCount() {
+        return updateCount;
+    }
+
+    @Override
+    public long getMaxTransactionId() {
+        return maxTransactionId;
+    }
+
+    @Override
+    public boolean isEOFExceptionEncountered() {
+        return eofException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalSummary.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalSummary.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalSummary.java
new file mode 100644
index 0000000..0f3c60f
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalSummary.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+public class StandardJournalSummary implements JournalSummary {
+    private final long firstTransactionId;
+    private final long lastTransactionId;
+    private final int transactionCount;
+
+    public StandardJournalSummary(final long firstTransactionId, final long 
lastTransactionId, final int transactionCount) {
+        this.firstTransactionId = firstTransactionId;
+        this.lastTransactionId = lastTransactionId;
+        this.transactionCount = transactionCount;
+    }
+
+    @Override
+    public long getFirstTransactionId() {
+        return firstTransactionId;
+    }
+
+    @Override
+    public long getLastTransactionId() {
+        return lastTransactionId;
+    }
+
+    @Override
+    public int getTransactionCount() {
+        return transactionCount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardSnapshotRecovery.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardSnapshotRecovery.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardSnapshotRecovery.java
new file mode 100644
index 0000000..70588dc
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardSnapshotRecovery.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+public class StandardSnapshotRecovery<T> implements SnapshotRecovery<T> {
+    private final Map<Object, T> recordMap;
+    private final Set<String> recoveredSwapLocations;
+    private final File recoveryFile;
+    private final long maxTransactionId;
+
+    public StandardSnapshotRecovery(final Map<Object, T> recordMap, final 
Set<String> recoveredSwapLocations, final File recoveryFile, final long 
maxTransactionId) {
+        this.recordMap = recordMap;
+        this.recoveredSwapLocations = recoveredSwapLocations;
+        this.recoveryFile = recoveryFile;
+        this.maxTransactionId = maxTransactionId;
+    }
+
+    @Override
+    public long getMaxTransactionId() {
+        return maxTransactionId;
+    }
+
+    @Override
+    public Map<Object, T> getRecords() {
+        return recordMap;
+    }
+
+    @Override
+    public Set<String> getRecoveredSwapLocations() {
+        return recoveredSwapLocations;
+    }
+
+    @Override
+    public File getRecoveryFile() {
+        return recoveryFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
new file mode 100644
index 0000000..f35d47a
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public interface WriteAheadJournal<T> extends Closeable {
+
+    JournalRecovery recoverRecords(Map<Object, T> recordMap, Set<String> 
swapLocations) throws IOException;
+
+    /**
+     * Updates the journal with the given set of records
+     *
+     * @param records the records to update
+     * @param recordLookup a lookup that can be used to access the current 
value of a record, given its ID
+     *
+     * @throws IOException if unable to write to the underlying storage 
mechanism
+     */
+    void update(Collection<T> records, RecordLookup<T> recordLookup) throws 
IOException;
+
+    void writeHeader() throws IOException;
+
+    void fsync() throws IOException;
+
+    /**
+     * Returns information about what was written to the journal
+     *
+     * @return A JournalSummary indicating what was written to the journal
+     * @throws IOException if unable to write to the underlying storage 
mechanism.
+     */
+    JournalSummary getSummary();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
new file mode 100644
index 0000000..a4cbcd2
--- /dev/null
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import java.io.IOException;
+import java.util.Collection;
+
+public interface WriteAheadSnapshot<T> {
+    SnapshotCapture<T> prepareSnapshot(long maxTransactionId);
+
+    void writeSnapshot(SnapshotCapture<T> snapshot) throws IOException;
+
+    SnapshotRecovery<T> recover() throws IOException;
+
+    void update(Collection<T> records);
+
+    int getRecordCount();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 0914a79..eabac9d 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -61,6 +61,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +74,12 @@ import org.slf4j.LoggerFactory;
  * </p>
  *
  * @param <T> type of record this WAL is for
+ *
+ * @deprecated This implementation is now deprecated in favor of {@link 
SequentialAccessWriteAheadLog}.
+ *             This implementation, when given more than 1 partition, can have 
issues recovering after a sudden loss
+ *             of power or an operating system crash.
  */
+@Deprecated
 public final class MinimalLockingWriteAheadLog<T> implements 
WriteAheadRepository<T> {
 
     private final Path basePath;
@@ -105,7 +111,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
     private volatile boolean recovered = false;
 
     public MinimalLockingWriteAheadLog(final Path path, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws 
IOException {
-        this(new TreeSet<>(Collections.singleton(path)), partitionCount, new 
SingletonSerDeFactory<T>(serde), syncListener);
+        this(new TreeSet<>(Collections.singleton(path)), partitionCount, new 
SingletonSerDeFactory<>(serde), syncListener);
     }
 
     public MinimalLockingWriteAheadLog(final Path path, final int 
partitionCount, final SerDeFactory<T> serdeFactory, final SyncListener 
syncListener) throws IOException {
@@ -113,7 +119,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
     }
 
     public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws 
IOException {
-        this(paths, partitionCount, new SingletonSerDeFactory<T>(serde), 
syncListener);
+        this(paths, partitionCount, new SingletonSerDeFactory<>(serde), 
syncListener);
     }
 
     /**
@@ -645,6 +651,9 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         } finally {
             writeLock.unlock();
             lockChannel.close();
+
+            final File lockFile = new File(basePath.toFile(), "wali.lock");
+            lockFile.delete();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java
index ffb11ca..7cc4fc0 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java
@@ -59,4 +59,14 @@ public interface SyncListener {
      * {@link WriteAheadRepository#sync()} method.
      */
     void onGlobalSync();
+
+    public static final SyncListener NOP_SYNC_LISTENER = new SyncListener() {
+        @Override
+        public void onSync(int partitionIndex) {
+        }
+
+        @Override
+        public void onGlobalSync() {
+        }
+    };
 }

Reply via email to