NIFI-4774: Implementation of SequentialAccessWriteAheadLog and updates to WriteAheadFlowFileRepository to make use of the new implementation instead of MinimalLockingWriteAheadLog.
Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #2416 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0bcb241d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0bcb241d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0bcb241d Branch: refs/heads/master Commit: 0bcb241db30f6f2560d305de8a3f50184b731e08 Parents: 14fef2d Author: Mark Payne <marka...@hotmail.com> Authored: Wed Jan 17 14:24:04 2018 -0500 Committer: Matthew Burgess <mattyb...@apache.org> Committed: Mon Feb 19 09:26:01 2018 -0500 ---------------------------------------------------------------------- .../org/apache/nifi/wali/BlockingQueuePool.java | 56 +++ .../nifi/wali/ByteArrayDataOutputStream.java | 45 ++ .../org/apache/nifi/wali/HashMapSnapshot.java | 366 ++++++++++++++ .../org/apache/nifi/wali/JournalRecovery.java | 27 ++ .../org/apache/nifi/wali/JournalSummary.java | 35 ++ .../nifi/wali/LengthDelimitedJournal.java | 478 +++++++++++++++++++ .../java/org/apache/nifi/wali/ObjectPool.java | 25 + .../java/org/apache/nifi/wali/RecordLookup.java | 29 ++ .../wali/SequentialAccessWriteAheadLog.java | 320 +++++++++++++ .../org/apache/nifi/wali/SnapshotCapture.java | 29 ++ .../org/apache/nifi/wali/SnapshotRecovery.java | 58 +++ .../nifi/wali/StandardJournalRecovery.java | 45 ++ .../nifi/wali/StandardJournalSummary.java | 46 ++ .../nifi/wali/StandardSnapshotRecovery.java | 56 +++ .../org/apache/nifi/wali/WriteAheadJournal.java | 51 ++ .../apache/nifi/wali/WriteAheadSnapshot.java | 33 ++ .../org/wali/MinimalLockingWriteAheadLog.java | 13 +- .../src/main/java/org/wali/SyncListener.java | 10 + .../apache/nifi/wali/TestBlockingQueuePool.java | 115 +++++ .../apache/nifi/wali/TestHashMapSnapshot.java | 216 +++++++++ .../nifi/wali/TestLengthDelimitedJournal.java | 353 ++++++++++++++ .../wali/TestSequentialAccessWriteAheadLog.java | 345 +++++++++++++ .../src/test/java/org/wali/DummyRecord.java | 31 ++ .../test/java/org/wali/DummyRecordSerde.java | 57 ++- .../wali/TestMinimalLockingWriteAheadLog.java | 55 ++- .../WriteAheadFlowFileRepository.java | 114 ++++- 26 files changed, 2960 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/BlockingQueuePool.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/BlockingQueuePool.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/BlockingQueuePool.java new file mode 100644 index 0000000..1b3346b --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/BlockingQueuePool.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.Supplier; + +public class BlockingQueuePool<T> implements ObjectPool<T> { + private final BlockingQueue<T> queue; + private final Supplier<T> creationFunction; + private final Predicate<T> reuseCheck; + private final Consumer<T> returnPreparation; + + public BlockingQueuePool(final int maxSize, final Supplier<T> creationFunction, final Predicate<T> reuseCheck, final Consumer<T> returnPreparation) { + this.queue = new LinkedBlockingQueue<>(maxSize); + this.creationFunction = creationFunction; + this.reuseCheck = reuseCheck; + this.returnPreparation = returnPreparation; + } + + @Override + public T borrowObject() { + final T existing = queue.poll(); + if (existing != null) { + return existing; + } + + return creationFunction.get(); + } + + @Override + public void returnObject(final T somethingBorrowed) { + if (reuseCheck.test(somethingBorrowed)) { + returnPreparation.accept(somethingBorrowed); + queue.offer(somethingBorrowed); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java new file mode 100644 index 0000000..1468d49 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; + +/** + * A wrapper around a DataOutputStream, which wraps a ByteArrayOutputStream. + * This allows us to obtain the DataOutputStream itself so that we can perform + * writeXYZ methods and also allows us to obtain the underlying ByteArrayOutputStream + * for performing methods such as size(), reset(), writeTo() + */ +public class ByteArrayDataOutputStream { + private final ByteArrayOutputStream baos; + private final DataOutputStream dos; + + public ByteArrayDataOutputStream(final int intiialBufferSize) { + this.baos = new ByteArrayOutputStream(intiialBufferSize); + this.dos = new DataOutputStream(baos); + } + + public DataOutputStream getDataOutputStream() { + return dos; + } + + public ByteArrayOutputStream getByteArrayOutputStream() { + return baos; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java new file mode 100644 index 0000000..0dad62c --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDe; +import org.wali.SerDeFactory; +import org.wali.UpdateType; + +public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T> { + private static final Logger logger = LoggerFactory.getLogger(HashMapSnapshot.class); + private static final int ENCODING_VERSION = 1; + + private final ConcurrentMap<Object, T> recordMap = new ConcurrentHashMap<>(); + private final SerDeFactory<T> serdeFactory; + private final Set<String> swapLocations = Collections.synchronizedSet(new HashSet<>()); + private final File storageDirectory; + + public HashMapSnapshot(final File storageDirectory, final SerDeFactory<T> serdeFactory) { + this.serdeFactory = serdeFactory; + this.storageDirectory = storageDirectory; + } + + private SnapshotHeader validateHeader(final DataInputStream dataIn) throws IOException { + final String snapshotClass = dataIn.readUTF(); + logger.debug("Snapshot Class Name for {} is {}", storageDirectory, snapshotClass); + if (!snapshotClass.equals(HashMapSnapshot.class.getName())) { + throw new IOException("Write-Ahead Log Snapshot located at " + storageDirectory + " was written using the " + + snapshotClass + " class; cannot restore using " + getClass().getName()); + } + + final int snapshotVersion = dataIn.readInt(); + logger.debug("Snapshot version for {} is {}", storageDirectory, snapshotVersion); + if (snapshotVersion > getVersion()) { + throw new IOException("Write-Ahead Log Snapshot located at " + storageDirectory + " was written using version " + + snapshotVersion + " of the " + snapshotClass + " class; cannot restore using Version " + getVersion()); + } + + final String serdeEncoding = dataIn.readUTF(); // ignore serde class name for now + logger.debug("Serde encoding for Snapshot at {} is {}", storageDirectory, serdeEncoding); + + final int serdeVersion = dataIn.readInt(); + logger.debug("Serde version for Snapshot at {} is {}", storageDirectory, serdeVersion); + + final long maxTransactionId = dataIn.readLong(); + logger.debug("Max Transaction ID for Snapshot at {} is {}", storageDirectory, maxTransactionId); + + final int numRecords = dataIn.readInt(); + logger.debug("Number of Records for Snapshot at {} is {}", storageDirectory, numRecords); + + final SerDe<T> serde = serdeFactory.createSerDe(serdeEncoding); + serde.readHeader(dataIn); + + return new SnapshotHeader(serde, serdeVersion, maxTransactionId, numRecords); + } + + @Override + public SnapshotRecovery<T> recover() throws IOException { + final File partialFile = getPartialFile(); + final File snapshotFile = getSnapshotFile(); + final boolean partialExists = partialFile.exists(); + final boolean snapshotExists = snapshotFile.exists(); + + // If there is no snapshot (which is the case before the first snapshot is ever created), then just + // return an empty recovery. + if (!partialExists && !snapshotExists) { + return SnapshotRecovery.emptyRecovery(); + } + + if (partialExists && snapshotExists) { + // both files exist -- assume NiFi crashed/died while checkpointing. Delete the partial file. + Files.delete(partialFile.toPath()); + } else if (partialExists) { + // partial exists but snapshot does not -- we must have completed + // creating the partial, deleted the snapshot + // but crashed before renaming the partial to the snapshot. Just + // rename partial to snapshot + Files.move(partialFile.toPath(), snapshotFile.toPath()); + } + + if (snapshotFile.length() == 0) { + logger.warn("{} Found 0-byte Snapshot file; skipping Snapshot file in recovery", this); + return SnapshotRecovery.emptyRecovery(); + } + + // At this point, we know the snapshotPath exists because if it didn't, then we either returned null + // or we renamed partialPath to snapshotPath. So just Recover from snapshotPath. + try (final DataInputStream dataIn = new DataInputStream(new BufferedInputStream(new FileInputStream(snapshotFile)))) { + // Ensure that the header contains the information that we expect and retrieve the relevant information from the header. + final SnapshotHeader header = validateHeader(dataIn); + + final SerDe<T> serde = header.getSerDe(); + final int serdeVersion = header.getSerDeVersion(); + final int numRecords = header.getNumRecords(); + final long maxTransactionId = header.getMaxTransactionId(); + + // Read all of the records that we expect to receive. + for (int i = 0; i < numRecords; i++) { + final T record = serde.deserializeRecord(dataIn, serdeVersion); + if (record == null) { + throw new EOFException(); + } + + final UpdateType updateType = serde.getUpdateType(record); + if (updateType == UpdateType.DELETE) { + logger.warn("While recovering from snapshot, found record with type 'DELETE'; this record will not be restored"); + continue; + } + + logger.trace("Recovered from snapshot: {}", record); + recordMap.put(serde.getRecordIdentifier(record), record); + } + + // Determine the location of any swap files. + final int numSwapRecords = dataIn.readInt(); + final Set<String> swapLocations = new HashSet<>(); + for (int i = 0; i < numSwapRecords; i++) { + swapLocations.add(dataIn.readUTF()); + } + this.swapLocations.addAll(swapLocations); + + logger.info("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}", + new Object[] {this, numRecords, swapLocations.size(), maxTransactionId}); + + return new StandardSnapshotRecovery<>(recordMap, swapLocations, snapshotFile, maxTransactionId); + } + } + + @Override + public void update(final Collection<T> records) { + // This implementation of Snapshot keeps a ConcurrentHashMap of all 'active' records + // (meaning records that have not been removed and are not swapped out), keyed by the + // Record Identifier. It keeps only the most up-to-date version of the Record. This allows + // us to write the snapshot very quickly without having to re-process the journal files. + // For each update, then, we will update the record in the map. + for (final T record : records) { + final Object recordId = serdeFactory.getRecordIdentifier(record); + final UpdateType updateType = serdeFactory.getUpdateType(record); + + switch (updateType) { + case DELETE: + recordMap.remove(recordId); + break; + case SWAP_OUT: + final String location = serdeFactory.getLocation(record); + if (location == null) { + logger.error("Received Record (ID=" + recordId + ") with UpdateType of SWAP_OUT but " + + "no indicator of where the Record is to be Swapped Out to; these records may be " + + "lost when the repository is restored!"); + } else { + recordMap.remove(recordId); + this.swapLocations.add(location); + } + break; + case SWAP_IN: + final String swapLocation = serdeFactory.getLocation(record); + if (swapLocation == null) { + logger.error("Received Record (ID=" + recordId + ") with UpdateType of SWAP_IN but no " + + "indicator of where the Record is to be Swapped In from; these records may be duplicated " + + "when the repository is restored!"); + } else { + swapLocations.remove(swapLocation); + } + recordMap.put(recordId, record); + break; + default: + recordMap.put(recordId, record); + break; + } + } + } + + @Override + public int getRecordCount() { + return recordMap.size(); + } + + @Override + public T lookup(final Object recordId) { + return recordMap.get(recordId); + } + + + @Override + public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId) { + return new Snapshot(new HashMap<>(recordMap), new HashSet<>(swapLocations), maxTransactionId); + } + + private int getVersion() { + return ENCODING_VERSION; + } + + private File getPartialFile() { + return new File(storageDirectory, "checkpoint.partial"); + } + + private File getSnapshotFile() { + return new File(storageDirectory, "checkpoint"); + } + + @Override + public synchronized void writeSnapshot(final SnapshotCapture<T> snapshot) throws IOException { + final SerDe<T> serde = serdeFactory.createSerDe(null); + + final File snapshotFile = getSnapshotFile(); + final File partialFile = getPartialFile(); + + // We must ensure that we do not overwrite the existing Snapshot file directly because if NiFi were + // to be killed or crash when we are partially finished, we'd end up with no viable Snapshot file at all. + // To avoid this, we write to a 'partial' file, then delete the existing Snapshot file, if it exists, and + // rename 'partial' to Snaphsot. That way, if NiFi crashes, we can still restore the Snapshot by first looking + // for a Snapshot file and restoring it, if it exists. If it does not exist, then we restore from the partial file, + // assuming that NiFi crashed after deleting the Snapshot file and before renaming the partial file. + // + // If there is no Snapshot file currently but there is a Partial File, then this indicates + // that we have deleted the Snapshot file and failed to rename the Partial File. We don't want + // to overwrite the Partial file, because doing so could potentially lose data. Instead, we must + // first rename it to Snapshot and then write to the partial file. + if (!snapshotFile.exists() && partialFile.exists()) { + final boolean rename = partialFile.renameTo(snapshotFile); + if (!rename) { + throw new IOException("Failed to rename partial snapshot file " + partialFile + " to " + snapshotFile); + } + } + + // Write to the partial file. + try (final FileOutputStream fileOut = new FileOutputStream(getPartialFile()); + final OutputStream bufferedOut = new BufferedOutputStream(fileOut); + final DataOutputStream dataOut = new DataOutputStream(bufferedOut)) { + + // Write out the header + dataOut.writeUTF(HashMapSnapshot.class.getName()); + dataOut.writeInt(getVersion()); + dataOut.writeUTF(serde.getClass().getName()); + dataOut.writeInt(serde.getVersion()); + dataOut.writeLong(snapshot.getMaxTransactionId()); + dataOut.writeInt(snapshot.getRecords().size()); + serde.writeHeader(dataOut); + + // Serialize each record + for (final T record : snapshot.getRecords().values()) { + logger.trace("Checkpointing {}", record); + serde.serializeRecord(record, dataOut); + } + + // Write out the number of swap locations, followed by the swap locations themselves. + dataOut.writeInt(snapshot.getSwapLocations().size()); + for (final String swapLocation : snapshot.getSwapLocations()) { + dataOut.writeUTF(swapLocation); + } + + // Ensure that we flush the Buffered Output Stream and then perform an fsync(). + // This ensures that the data is fully written to disk before we delete the existing snapshot. + dataOut.flush(); + fileOut.getChannel().force(false); + } + + // If the snapshot file exists, delete it + if (snapshotFile.exists()) { + if (!snapshotFile.delete()) { + logger.warn("Unable to delete existing Snapshot file " + snapshotFile); + } + } + + // Rename the partial file to Snapshot. + final boolean rename = partialFile.renameTo(snapshotFile); + if (!rename) { + throw new IOException("Failed to rename partial snapshot file " + partialFile + " to " + snapshotFile); + } + } + + + public class Snapshot implements SnapshotCapture<T> { + private final Map<Object, T> records; + private final long maxTransactionId; + private final Set<String> swapLocations; + + public Snapshot(final Map<Object, T> records, final Set<String> swapLocations, final long maxTransactionId) { + this.records = records; + this.swapLocations = swapLocations; + this.maxTransactionId = maxTransactionId; + } + + @Override + public final Map<Object, T> getRecords() { + return records; + } + + @Override + public long getMaxTransactionId() { + return maxTransactionId; + } + + @Override + public Set<String> getSwapLocations() { + return swapLocations; + } + } + + private class SnapshotHeader { + private final SerDe<T> serde; + private final int serdeVersion; + private final int numRecords; + private final long maxTransactionId; + + public SnapshotHeader(final SerDe<T> serde, final int serdeVersion, final long maxTransactionId, final int numRecords) { + this.serde = serde; + this.serdeVersion = serdeVersion; + this.maxTransactionId = maxTransactionId; + this.numRecords = numRecords; + } + + public SerDe<T> getSerDe() { + return serde; + } + + public int getSerDeVersion() { + return serdeVersion; + } + + public long getMaxTransactionId() { + return maxTransactionId; + } + + public int getNumRecords() { + return numRecords; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalRecovery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalRecovery.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalRecovery.java new file mode 100644 index 0000000..2832396 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalRecovery.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public interface JournalRecovery { + + int getUpdateCount(); + + long getMaxTransactionId(); + + boolean isEOFExceptionEncountered(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalSummary.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalSummary.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalSummary.java new file mode 100644 index 0000000..ee21036 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalSummary.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public interface JournalSummary { + /** + * @return the Transaction ID of the first transaction written + */ + long getFirstTransactionId(); + + /** + * @return the Transaction ID of the last transaction written + */ + long getLastTransactionId(); + + /** + * @return the number of transactions that were written to the journal + */ + int getTransactionCount(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java new file mode 100644 index 0000000..0b2a8d3 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDe; +import org.wali.SerDeFactory; +import org.wali.UpdateType; + +public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> { + private static final Logger logger = LoggerFactory.getLogger(LengthDelimitedJournal.class); + private static final JournalSummary INACTIVE_JOURNAL_SUMMARY = new StandardJournalSummary(-1L, -1L, 0); + private static final int JOURNAL_ENCODING_VERSION = 1; + private static final byte TRANSACTION_FOLLOWS = 64; + private static final byte JOURNAL_COMPLETE = 127; + private static final int NUL_BYTE = 0; + + private final File journalFile; + private final long initialTransactionId; + private final SerDeFactory<T> serdeFactory; + private final ObjectPool<ByteArrayDataOutputStream> streamPool; + + private SerDe<T> serde; + private FileOutputStream fileOut; + private BufferedOutputStream bufferedOut; + + private long currentTransactionId; + private int transactionCount; + private boolean headerWritten = false; + + private volatile boolean poisoned = false; + private volatile boolean closed = false; + private final ByteBuffer transactionPreamble = ByteBuffer.allocate(12); // guarded by synchronized block + + public LengthDelimitedJournal(final File journalFile, final SerDeFactory<T> serdeFactory, final ObjectPool<ByteArrayDataOutputStream> streamPool, final long initialTransactionId) { + this.journalFile = journalFile; + this.serdeFactory = serdeFactory; + this.serde = serdeFactory.createSerDe(null); + this.streamPool = streamPool; + + this.initialTransactionId = initialTransactionId; + this.currentTransactionId = initialTransactionId; + } + + private synchronized OutputStream getOutputStream() throws FileNotFoundException { + if (fileOut == null) { + fileOut = new FileOutputStream(journalFile); + bufferedOut = new BufferedOutputStream(fileOut); + } + + return bufferedOut; + } + + + @Override + public synchronized void writeHeader() throws IOException { + try { + final DataOutputStream outStream = new DataOutputStream(getOutputStream()); + outStream.writeUTF(LengthDelimitedJournal.class.getName()); + outStream.writeInt(JOURNAL_ENCODING_VERSION); + + serde = serdeFactory.createSerDe(null); + outStream.writeUTF(serde.getClass().getName()); + outStream.writeInt(serde.getVersion()); + + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream dos = new DataOutputStream(baos)) { + + serde.writeHeader(dos); + dos.flush(); + + final int serdeHeaderLength = baos.size(); + outStream.writeInt(serdeHeaderLength); + baos.writeTo(outStream); + } + + outStream.flush(); + } catch (final Throwable t) { + poison(t); + + final IOException ioe = (t instanceof IOException) ? (IOException) t : new IOException("Failed to create journal file " + journalFile, t); + logger.error("Failed to create new journal file {} due to {}", journalFile, ioe.toString(), ioe); + throw ioe; + } + + headerWritten = true; + } + + private synchronized SerDeAndVersion validateHeader(final DataInputStream in) throws IOException { + final String journalClassName = in.readUTF(); + logger.debug("Write Ahead Log Class Name for {} is {}", journalFile, journalClassName); + if (!LengthDelimitedJournal.class.getName().equals(journalClassName)) { + throw new IOException("Invalid header information - " + journalFile + " does not appear to be a valid journal file."); + } + + final int encodingVersion = in.readInt(); + logger.debug("Encoding version for {} is {}", journalFile, encodingVersion); + if (encodingVersion > JOURNAL_ENCODING_VERSION) { + throw new IOException("Cannot read journal file " + journalFile + " because it is encoded using veresion " + encodingVersion + + " but this version of the code only understands version " + JOURNAL_ENCODING_VERSION + " and below"); + } + + final String serdeClassName = in.readUTF(); + logger.debug("Serde Class Name for {} is {}", journalFile, serdeClassName); + final SerDe<T> serde; + try { + serde = serdeFactory.createSerDe(serdeClassName); + } catch (final IllegalArgumentException iae) { + throw new IOException("Cannot read journal file " + journalFile + " because the serializer/deserializer used was " + serdeClassName + + " but this repository is configured to use a different type of serializer/deserializer"); + } + + final int serdeVersion = in.readInt(); + logger.debug("Serde version is {}", serdeVersion); + if (serdeVersion > serde.getVersion()) { + throw new IOException("Cannot read journal file " + journalFile + " because it is encoded using veresion " + encodingVersion + + " of the serializer/deserializer but this version of the code only understands version " + serde.getVersion() + " and below"); + } + + final int serdeHeaderLength = in.readInt(); + final InputStream serdeHeaderIn = new LimitingInputStream(in, serdeHeaderLength); + final DataInputStream dis = new DataInputStream(serdeHeaderIn); + serde.readHeader(dis); + + return new SerDeAndVersion(serde, serdeVersion); + } + + + @Override + public void update(final Collection<T> records, final RecordLookup<T> recordLookup) throws IOException { + if (!headerWritten) { + throw new IllegalStateException("Cannot update journal file " + journalFile + " because no header has been written yet."); + } + + if (records.isEmpty()) { + return; + } + + checkState(); + + final ByteArrayDataOutputStream bados = streamPool.borrowObject(); + try { + for (final T record : records) { + final Object recordId = serde.getRecordIdentifier(record); + final T previousRecordState = recordLookup.lookup(recordId); + serde.serializeEdit(previousRecordState, record, bados.getDataOutputStream()); + } + + final ByteArrayOutputStream baos = bados.getByteArrayOutputStream(); + final OutputStream out = getOutputStream(); + + final long transactionId; + synchronized (this) { + transactionId = currentTransactionId++; + transactionCount++; + + transactionPreamble.clear(); + transactionPreamble.putLong(transactionId); + transactionPreamble.putInt(baos.size()); + + out.write(TRANSACTION_FOLLOWS); + out.write(transactionPreamble.array()); + baos.writeTo(out); + out.flush(); + } + + logger.debug("Wrote Transaction {} to journal {} with length {} and {} records", transactionId, journalFile, baos.size(), records.size()); + } catch (final Throwable t) { + poison(t); + throw t; + } finally { + streamPool.returnObject(bados); + } + } + + private void checkState() throws IOException { + if (poisoned) { + throw new IOException("Cannot update journal file " + journalFile + " because this journal has already encountered a failure when attempting to write to the file. " + + "If the repository is able to checkpoint, then this problem will resolve itself. However, if the repository is unable to be checkpointed " + + "(for example, due to being out of storage space or having too many open files), then this issue may require manual intervention."); + } + + if (closed) { + throw new IOException("Cannot update journal file " + journalFile + " because this journal has already been closed"); + } + } + + private void poison(final Throwable t) { + this.poisoned = true; + + try { + if (fileOut != null) { + fileOut.close(); + } + + closed = true; + } catch (final IOException innerIOE) { + t.addSuppressed(innerIOE); + } + } + + @Override + public synchronized void fsync() throws IOException { + checkState(); + + try { + if (fileOut != null) { + fileOut.getChannel().force(false); + } + } catch (final IOException ioe) { + poison(ioe); + } + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + closed = true; + + try { + if (fileOut != null) { + if (!poisoned) { + fileOut.write(JOURNAL_COMPLETE); + } + + fileOut.close(); + } + } catch (final IOException ioe) { + poison(ioe); + } + } + + @Override + public JournalRecovery recoverRecords(final Map<Object, T> recordMap, final Set<String> swapLocations) throws IOException { + long maxTransactionId = -1L; + int updateCount = 0; + + boolean eofException = false; + logger.info("Recovering records from journal {}", journalFile); + final double journalLength = journalFile.length(); + + try (final InputStream fis = new FileInputStream(journalFile); + final InputStream bufferedIn = new BufferedInputStream(fis); + final ByteCountingInputStream byteCountingIn = new ByteCountingInputStream(bufferedIn); + final DataInputStream in = new DataInputStream(byteCountingIn)) { + + try { + // Validate that the header is what we expect and obtain the appropriate SerDe and Version information + final SerDeAndVersion serdeAndVersion = validateHeader(in); + final SerDe<T> serde = serdeAndVersion.getSerDe(); + + // Ensure that we get a valid transaction indicator + int transactionIndicator = in.read(); + if (transactionIndicator != TRANSACTION_FOLLOWS && transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) { + throw new IOException("After reading " + byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", encountered unexpected value of " + + transactionIndicator + " for the Transaction Indicator. This journal may have been corrupted."); + } + + long consumedAtLog = 0L; + + // We don't want to apply the updates in a transaction until we've finished recovering the entire + // transaction. Otherwise, we could apply say 8 out of 10 updates and then hit an EOF. In such a case, + // we want to rollback the entire transaction. We handle this by not updating recordMap or swapLocations + // variables directly but instead keeping track of the things that occurred and then once we've read the + // entire transaction, we can apply those updates to the recordMap and swapLocations. + final Map<Object, T> transactionRecordMap = new HashMap<>(); + final Set<Object> idsRemoved = new HashSet<>(); + final Set<String> swapLocationsRemoved = new HashSet<>(); + final Set<String> swapLocationsAdded = new HashSet<>(); + int transactionUpdates = 0; + + // While we have a transaction to recover, recover it + while (transactionIndicator == TRANSACTION_FOLLOWS) { + transactionRecordMap.clear(); + idsRemoved.clear(); + swapLocationsRemoved.clear(); + swapLocationsAdded.clear(); + transactionUpdates = 0; + + // Format is <Transaction ID: 8 bytes> <Transaction Length: 4 bytes> <Transaction data: # of bytes indicated by Transaction Length Field> + final long transactionId = in.readLong(); + maxTransactionId = Math.max(maxTransactionId, transactionId); + final int transactionLength = in.readInt(); + + // Use SerDe to deserialize the update. We use a LimitingInputStream to ensure that the SerDe is not able to read past its intended + // length, in case there is a bug in the SerDe. We then use a ByteCountingInputStream so that we can ensure that all of the data has + // been read and throw EOFException otherwise. + final InputStream transactionLimitingIn = new LimitingInputStream(in, transactionLength); + final ByteCountingInputStream transactionByteCountingIn = new ByteCountingInputStream(transactionLimitingIn); + final DataInputStream transactionDis = new DataInputStream(transactionByteCountingIn); + + while (transactionByteCountingIn.getBytesConsumed() < transactionLength) { + final T record = serde.deserializeEdit(transactionDis, recordMap, serdeAndVersion.getVersion()); + + // Update our RecordMap so that we have the most up-to-date version of the Record. + final Object recordId = serde.getRecordIdentifier(record); + final UpdateType updateType = serde.getUpdateType(record); + + switch (updateType) { + case DELETE: { + idsRemoved.add(recordId); + transactionRecordMap.remove(recordId); + break; + } + case SWAP_IN: { + final String location = serde.getLocation(record); + if (location == null) { + logger.error("Recovered SWAP_IN record from edit log, but it did not contain a Location; skipping record"); + } else { + swapLocationsRemoved.add(location); + swapLocationsAdded.remove(location); + transactionRecordMap.put(recordId, record); + } + break; + } + case SWAP_OUT: { + final String location = serde.getLocation(record); + if (location == null) { + logger.error("Recovered SWAP_OUT record from edit log, but it did not contain a Location; skipping record"); + } else { + swapLocationsRemoved.remove(location); + swapLocationsAdded.add(location); + idsRemoved.add(recordId); + transactionRecordMap.remove(recordId); + } + + break; + } + default: { + transactionRecordMap.put(recordId, record); + idsRemoved.remove(recordId); + break; + } + } + + transactionUpdates++; + } + + // Apply the transaction + for (final Object id : idsRemoved) { + recordMap.remove(id); + } + recordMap.putAll(transactionRecordMap); + swapLocations.removeAll(swapLocationsRemoved); + swapLocations.addAll(swapLocationsAdded); + updateCount += transactionUpdates; + + // Check if there is another transaction to read + transactionIndicator = in.read(); + if (transactionIndicator != TRANSACTION_FOLLOWS && transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) { + throw new IOException("After reading " + byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", encountered unexpected value of " + + transactionIndicator + " for the Transaction Indicator. This journal may have been corrupted."); + } + + // If we have a very large journal (for instance, if checkpoint is not called for a long time, or if there is a problem rolling over + // the journal), then we want to occasionally notify the user that we are, in fact, making progress, so that it doesn't appear that + // NiFi has become "stuck". + final long consumed = byteCountingIn.getBytesConsumed(); + if (consumed - consumedAtLog > 50_000_000) { + final double percentage = consumed / journalLength * 100D; + final String pct = new DecimalFormat("#.00").format(percentage); + logger.info("{}% of the way finished recovering journal {}, having recovered {} updates", pct, journalFile, updateCount); + consumedAtLog = consumed; + } + } + } catch (final EOFException eof) { + eofException = true; + logger.warn("Encountered unexpected End-of-File when reading journal file {}; assuming that NiFi was shutdown unexpectedly and continuing recovery", journalFile); + } catch (final Exception e) { + // If the stream consists solely of NUL bytes, then we want to treat it + // the same as an EOF because we see this happen when we suddenly lose power + // while writing to a file. However, if that is not the case, then something else has gone wrong. + // In such a case, there is not much that we can do but to re-throw the Exception. + if (remainingBytesAllNul(in)) { + logger.warn("Failed to recover some of the data from Write-Ahead Log Journal because encountered trailing NUL bytes. " + + "This will sometimes happen after a sudden power loss. The rest of this journal file will be skipped for recovery purposes." + + "The following Exception was encountered while recovering the updates to the journal:", e); + } else { + throw e; + } + } + } + + logger.info("Successfully recovered {} updates from journal {}", updateCount, journalFile); + return new StandardJournalRecovery(updateCount, maxTransactionId, eofException); + } + + /** + * In the case of a sudden power loss, it is common - at least in a Linux journaling File System - + * that the partition file that is being written to will have many trailing "NUL bytes" (0's). + * If this happens, then on restart we want to treat this as an incomplete transaction, so we detect + * this case explicitly. + * + * @param in the input stream to scan + * @return <code>true</code> if the InputStream contains no data or contains only NUL bytes + * @throws IOException if unable to read from the given InputStream + */ + private boolean remainingBytesAllNul(final InputStream in) throws IOException { + int nextByte; + while ((nextByte = in.read()) != -1) { + if (nextByte != NUL_BYTE) { + return false; + } + } + + return true; + } + + + @Override + public synchronized JournalSummary getSummary() { + if (transactionCount < 1) { + return INACTIVE_JOURNAL_SUMMARY; + } + + return new StandardJournalSummary(initialTransactionId, currentTransactionId - 1, transactionCount); + } + + private class SerDeAndVersion { + private final SerDe<T> serde; + private final int version; + + public SerDeAndVersion(final SerDe<T> serde, final int version) { + this.serde = serde; + this.version = version; + } + + public SerDe<T> getSerDe() { + return serde; + } + + public int getVersion() { + return version; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ObjectPool.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ObjectPool.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ObjectPool.java new file mode 100644 index 0000000..16f1a36 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ObjectPool.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public interface ObjectPool<T> { + + T borrowObject(); + + void returnObject(T somethingBorrowed); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/RecordLookup.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/RecordLookup.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/RecordLookup.java new file mode 100644 index 0000000..4c7b834 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/RecordLookup.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public interface RecordLookup<T> { + + /** + * Returns the Record with the given identifier, or <code>null</code> if no such record exists + * + * @param identifier the identifier of the record to lookup + * @return the Record with the given identifier, or <code>null</code> if no such record exists + */ + T lookup(Object identifier); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java new file mode 100644 index 0000000..e4a1db7 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDeFactory; +import org.wali.SyncListener; +import org.wali.WriteAheadRepository; + +/** + * <p> + * This implementation of WriteAheadRepository provides the ability to write all updates to the + * repository sequentially by writing to a single journal file. Serialization of data into bytes + * happens outside of any lock contention and is done so using recycled byte buffers. As such, + * we occur minimal garbage collection and the theoretical throughput of this repository is equal + * to the throughput of the underlying disk itself. + * </p> + * + * <p> + * This implementation makes the assumption that only a single thread will ever issue updates for + * a given Record at any one time. I.e., the implementation is thread-safe but cannot guarantee + * that records are recovered correctly if two threads simultaneously update the write-ahead log + * with updates for the same record. + * </p> + */ +public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T> { + private static final int PARTITION_INDEX = 0; + private static final Logger logger = LoggerFactory.getLogger(SequentialAccessWriteAheadLog.class); + private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal"); + private static final int MAX_BUFFERS = 64; + private static final int BUFFER_SIZE = 256 * 1024; + + private final File storageDirectory; + private final File journalsDirectory; + private final SerDeFactory<T> serdeFactory; + private final SyncListener syncListener; + + private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock(); + private final Lock journalReadLock = journalRWLock.readLock(); + private final Lock journalWriteLock = journalRWLock.writeLock(); + private final ObjectPool<ByteArrayDataOutputStream> streamPool = new BlockingQueuePool<>(MAX_BUFFERS, + () -> new ByteArrayDataOutputStream(BUFFER_SIZE), + stream -> stream.getByteArrayOutputStream().size() < BUFFER_SIZE, + stream -> stream.getByteArrayOutputStream().reset()); + + private final WriteAheadSnapshot<T> snapshot; + private final RecordLookup<T> recordLookup; + private SnapshotRecovery<T> snapshotRecovery; + + private volatile boolean recovered = false; + private WriteAheadJournal<T> journal; + private volatile long nextTransactionId = 0L; + + public SequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory<T> serdeFactory) throws IOException { + this(storageDirectory, serdeFactory, SyncListener.NOP_SYNC_LISTENER); + } + + public SequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException { + if (!storageDirectory.exists() && !storageDirectory.mkdirs()) { + throw new IOException("Directory " + storageDirectory + " does not exist and cannot be created"); + } + if (!storageDirectory.isDirectory()) { + throw new IOException("File " + storageDirectory + " is a regular file and not a directory"); + } + + final HashMapSnapshot<T> hashMapSnapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory); + this.snapshot = hashMapSnapshot; + this.recordLookup = hashMapSnapshot; + + this.storageDirectory = storageDirectory; + this.journalsDirectory = new File(storageDirectory, "journals"); + if (!journalsDirectory.exists() && !journalsDirectory.mkdirs()) { + throw new IOException("Directory " + journalsDirectory + " does not exist and cannot be created"); + } + + recovered = false; + + this.serdeFactory = serdeFactory; + this.syncListener = (syncListener == null) ? SyncListener.NOP_SYNC_LISTENER : syncListener; + } + + @Override + public int update(final Collection<T> records, final boolean forceSync) throws IOException { + if (!recovered) { + throw new IllegalStateException("Cannot update repository until record recovery has been performed"); + } + + journalReadLock.lock(); + try { + journal.update(records, recordLookup); + + if (forceSync) { + journal.fsync(); + syncListener.onSync(PARTITION_INDEX); + } + + snapshot.update(records); + } finally { + journalReadLock.unlock(); + } + + return PARTITION_INDEX; + } + + @Override + public synchronized Collection<T> recoverRecords() throws IOException { + if (recovered) { + throw new IllegalStateException("Cannot recover records from repository because record recovery has already commenced"); + } + + logger.info("Recovering records from Write-Ahead Log at {}", storageDirectory); + + final long recoverStart = System.nanoTime(); + recovered = true; + snapshotRecovery = snapshot.recover(); + + final long snapshotRecoveryMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart); + + final Map<Object, T> recoveredRecords = snapshotRecovery.getRecords(); + final Set<String> swapLocations = snapshotRecovery.getRecoveredSwapLocations(); + + final File[] journalFiles = journalsDirectory.listFiles(this::isJournalFile); + if (journalFiles == null) { + throw new IOException("Cannot access the list of files in directory " + journalsDirectory + "; please ensure that appropriate file permissions are set."); + } + + if (snapshotRecovery.getRecoveryFile() == null) { + logger.info("No Snapshot File to recover from at {}. Now recovering records from {} journal files", storageDirectory, journalFiles.length); + } else { + logger.info("Successfully recovered {} records and {} swap files from Snapshot at {} with Max Transaction ID of {} in {} milliseconds. Now recovering records from {} journal files", + recoveredRecords.size(), swapLocations.size(), snapshotRecovery.getRecoveryFile(), snapshotRecovery.getMaxTransactionId(), + snapshotRecoveryMillis, journalFiles.length); + } + + final List<File> orderedJournalFiles = Arrays.asList(journalFiles); + Collections.sort(orderedJournalFiles, new Comparator<File>() { + @Override + public int compare(final File o1, final File o2) { + final long transactionId1 = getMinTransactionId(o1); + final long transactionId2 = getMinTransactionId(o2); + + return Long.compare(transactionId1, transactionId2); + } + }); + + final long snapshotTransactionId = snapshotRecovery.getMaxTransactionId(); + + int totalUpdates = 0; + int journalFilesRecovered = 0; + int journalFilesSkipped = 0; + long maxTransactionId = snapshotTransactionId; + + for (final File journalFile : orderedJournalFiles) { + final long journalMinTransactionId = getMinTransactionId(journalFile); + if (journalMinTransactionId < snapshotTransactionId) { + logger.debug("Will not recover records from journal file {} because the minimum Transaction ID for that journal is {} and the Transaction ID recovered from Snapshot was {}", + journalFile, journalMinTransactionId, snapshotTransactionId); + + journalFilesSkipped++; + continue; + } + + logger.debug("Min Transaction ID for journal {} is {}, so will recover records from journal", journalFile, journalMinTransactionId); + journalFilesRecovered++; + + try (final WriteAheadJournal<T> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + final JournalRecovery journalRecovery = journal.recoverRecords(recoveredRecords, swapLocations); + final int updates = journalRecovery.getUpdateCount(); + + logger.debug("Recovered {} updates from journal {}", updates, journalFile); + totalUpdates += updates; + maxTransactionId = Math.max(maxTransactionId, journalRecovery.getMaxTransactionId()); + } + } + + logger.debug("Recovered {} updates from {} journal files and skipped {} journal files because their data was already encapsulated in the snapshot", + totalUpdates, journalFilesRecovered, journalFilesSkipped); + this.nextTransactionId = maxTransactionId + 1; + + final long recoverNanos = System.nanoTime() - recoverStart; + final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS); + logger.info("Successfully recovered {} records in {} milliseconds. Now checkpointing to ensure that Write-Ahead Log is in a consistent state", recoveredRecords.size(), recoveryMillis); + + checkpoint(); + + return recoveredRecords.values(); + } + + private long getMinTransactionId(final File journalFile) { + final String filename = journalFile.getName(); + final String numeral = filename.substring(0, filename.indexOf(".")); + return Long.parseLong(numeral); + } + + private boolean isJournalFile(final File file) { + if (!file.isFile()) { + return false; + } + + final String filename = file.getName(); + return JOURNAL_FILENAME_PATTERN.matcher(filename).matches(); + } + + @Override + public synchronized Set<String> getRecoveredSwapLocations() throws IOException { + if (!recovered) { + throw new IllegalStateException("Cannot retrieve the Recovered Swap Locations until record recovery has been performed"); + } + + return snapshotRecovery.getRecoveredSwapLocations(); + } + + @Override + public int checkpoint() throws IOException { + final SnapshotCapture<T> snapshotCapture; + + final long startNanos = System.nanoTime(); + final File[] existingJournals; + journalWriteLock.lock(); + try { + if (journal != null) { + final JournalSummary journalSummary = journal.getSummary(); + if (journalSummary.getTransactionCount() == 0) { + logger.debug("Will not checkpoint Write-Ahead Log because no updates have occurred since last checkpoint"); + return snapshot.getRecordCount(); + } + + journal.fsync(); + journal.close(); + + nextTransactionId = Math.max(nextTransactionId, journalSummary.getLastTransactionId() + 1); + } + + syncListener.onGlobalSync(); + + final File[] existingFiles = journalsDirectory.listFiles(this::isJournalFile); + existingJournals = (existingFiles == null) ? new File[0] : existingFiles; + + snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1); + + // Create a new journal. We name the journal file <next transaction id>.journal but it is possible + // that we could have an empty journal file already created. If this happens, we don't want to create + // a new file on top of it because it would get deleted below when we clean up old journals. So we + // will simply increment our transaction ID and try again. + File journalFile = new File(journalsDirectory, String.valueOf(nextTransactionId) + ".journal"); + while (journalFile.exists()) { + nextTransactionId++; + journalFile = new File(journalsDirectory, String.valueOf(nextTransactionId) + ".journal"); + } + + journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, nextTransactionId); + journal.writeHeader(); + + logger.debug("Created new Journal starting with Transaction ID {}", nextTransactionId); + } finally { + journalWriteLock.unlock(); + } + + final long stopTheWorldMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + snapshot.writeSnapshot(snapshotCapture); + + for (final File existingJournal : existingJournals) { + logger.debug("Deleting Journal {} because it is now encapsulated in the latest Snapshot", existingJournal.getName()); + if (!existingJournal.delete() && existingJournal.exists()) { + logger.warn("Unable to delete expired journal file " + existingJournal + "; this file should be deleted manually."); + } + } + + final long totalNanos = System.nanoTime() - startNanos; + final long millis = TimeUnit.NANOSECONDS.toMillis(totalNanos); + logger.info("Checkpointed Write-Ahead Log with {} Records and {} Swap Files in {} milliseconds (Stop-the-world time = {} milliseconds), max Transaction ID {}", + new Object[] {snapshotCapture.getRecords().size(), snapshotCapture.getSwapLocations().size(), millis, stopTheWorldMillis, snapshotCapture.getMaxTransactionId()}); + + return snapshotCapture.getRecords().size(); + } + + + @Override + public void shutdown() throws IOException { + journalWriteLock.lock(); + try { + if (journal != null) { + journal.close(); + } + } finally { + journalWriteLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotCapture.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotCapture.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotCapture.java new file mode 100644 index 0000000..5a24258 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotCapture.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.util.Map; +import java.util.Set; + +public interface SnapshotCapture<T> { + Map<Object, T> getRecords(); + + long getMaxTransactionId(); + + Set<String> getSwapLocations(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotRecovery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotRecovery.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotRecovery.java new file mode 100644 index 0000000..9b6339a --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotRecovery.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.File; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +public interface SnapshotRecovery<T> { + long getMaxTransactionId(); + + Map<Object, T> getRecords(); + + Set<String> getRecoveredSwapLocations(); + + File getRecoveryFile(); + + + public static <T> SnapshotRecovery<T> emptyRecovery() { + return new SnapshotRecovery<T>() { + @Override + public long getMaxTransactionId() { + return -1L; + } + + @Override + public Map<Object, T> getRecords() { + return Collections.emptyMap(); + } + + @Override + public Set<String> getRecoveredSwapLocations() { + return Collections.emptySet(); + } + + @Override + public File getRecoveryFile() { + return null; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalRecovery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalRecovery.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalRecovery.java new file mode 100644 index 0000000..459e777 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalRecovery.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public class StandardJournalRecovery implements JournalRecovery { + private final int updateCount; + private final long maxTransactionId; + private final boolean eofException; + + public StandardJournalRecovery(final int updateCount, final long maxTransactionId, final boolean eofException) { + this.updateCount = updateCount; + this.maxTransactionId = maxTransactionId; + this.eofException = eofException; + } + + @Override + public int getUpdateCount() { + return updateCount; + } + + @Override + public long getMaxTransactionId() { + return maxTransactionId; + } + + @Override + public boolean isEOFExceptionEncountered() { + return eofException; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalSummary.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalSummary.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalSummary.java new file mode 100644 index 0000000..0f3c60f --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalSummary.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public class StandardJournalSummary implements JournalSummary { + private final long firstTransactionId; + private final long lastTransactionId; + private final int transactionCount; + + public StandardJournalSummary(final long firstTransactionId, final long lastTransactionId, final int transactionCount) { + this.firstTransactionId = firstTransactionId; + this.lastTransactionId = lastTransactionId; + this.transactionCount = transactionCount; + } + + @Override + public long getFirstTransactionId() { + return firstTransactionId; + } + + @Override + public long getLastTransactionId() { + return lastTransactionId; + } + + @Override + public int getTransactionCount() { + return transactionCount; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardSnapshotRecovery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardSnapshotRecovery.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardSnapshotRecovery.java new file mode 100644 index 0000000..70588dc --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardSnapshotRecovery.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.File; +import java.util.Map; +import java.util.Set; + +public class StandardSnapshotRecovery<T> implements SnapshotRecovery<T> { + private final Map<Object, T> recordMap; + private final Set<String> recoveredSwapLocations; + private final File recoveryFile; + private final long maxTransactionId; + + public StandardSnapshotRecovery(final Map<Object, T> recordMap, final Set<String> recoveredSwapLocations, final File recoveryFile, final long maxTransactionId) { + this.recordMap = recordMap; + this.recoveredSwapLocations = recoveredSwapLocations; + this.recoveryFile = recoveryFile; + this.maxTransactionId = maxTransactionId; + } + + @Override + public long getMaxTransactionId() { + return maxTransactionId; + } + + @Override + public Map<Object, T> getRecords() { + return recordMap; + } + + @Override + public Set<String> getRecoveredSwapLocations() { + return recoveredSwapLocations; + } + + @Override + public File getRecoveryFile() { + return recoveryFile; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java new file mode 100644 index 0000000..f35d47a --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +public interface WriteAheadJournal<T> extends Closeable { + + JournalRecovery recoverRecords(Map<Object, T> recordMap, Set<String> swapLocations) throws IOException; + + /** + * Updates the journal with the given set of records + * + * @param records the records to update + * @param recordLookup a lookup that can be used to access the current value of a record, given its ID + * + * @throws IOException if unable to write to the underlying storage mechanism + */ + void update(Collection<T> records, RecordLookup<T> recordLookup) throws IOException; + + void writeHeader() throws IOException; + + void fsync() throws IOException; + + /** + * Returns information about what was written to the journal + * + * @return A JournalSummary indicating what was written to the journal + * @throws IOException if unable to write to the underlying storage mechanism. + */ + JournalSummary getSummary(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java new file mode 100644 index 0000000..a4cbcd2 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.IOException; +import java.util.Collection; + +public interface WriteAheadSnapshot<T> { + SnapshotCapture<T> prepareSnapshot(long maxTransactionId); + + void writeSnapshot(SnapshotCapture<T> snapshot) throws IOException; + + SnapshotRecovery<T> recover() throws IOException; + + void update(Collection<T> records); + + int getRecordCount(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 0914a79..eabac9d 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -61,6 +61,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; +import org.apache.nifi.wali.SequentialAccessWriteAheadLog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,12 @@ import org.slf4j.LoggerFactory; * </p> * * @param <T> type of record this WAL is for + * + * @deprecated This implementation is now deprecated in favor of {@link SequentialAccessWriteAheadLog}. + * This implementation, when given more than 1 partition, can have issues recovering after a sudden loss + * of power or an operating system crash. */ +@Deprecated public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepository<T> { private final Path basePath; @@ -105,7 +111,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor private volatile boolean recovered = false; public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { - this(new TreeSet<>(Collections.singleton(path)), partitionCount, new SingletonSerDeFactory<T>(serde), syncListener); + this(new TreeSet<>(Collections.singleton(path)), partitionCount, new SingletonSerDeFactory<>(serde), syncListener); } public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException { @@ -113,7 +119,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor } public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { - this(paths, partitionCount, new SingletonSerDeFactory<T>(serde), syncListener); + this(paths, partitionCount, new SingletonSerDeFactory<>(serde), syncListener); } /** @@ -645,6 +651,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor } finally { writeLock.unlock(); lockChannel.close(); + + final File lockFile = new File(basePath.toFile(), "wali.lock"); + lockFile.delete(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0bcb241d/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java index ffb11ca..7cc4fc0 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java @@ -59,4 +59,14 @@ public interface SyncListener { * {@link WriteAheadRepository#sync()} method. */ void onGlobalSync(); + + public static final SyncListener NOP_SYNC_LISTENER = new SyncListener() { + @Override + public void onSync(int partitionIndex) { + } + + @Override + public void onGlobalSync() { + } + }; }