Repository: nifi Updated Branches: refs/heads/master 6a75ab174 -> 292dd1d66
NIFI-3678: Ensure that we catch EOFException when reading header information from WAL Partition files; previously, we caught EOFExceptions when reading a 'record' from the WAL but not when reading header info NIFI-3678: If we have a transaction ID but then have no more data written to Partition file, we end up with a NPE. Added logic to avoid this and instead return null for the next record when this happens This closes #1656. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/292dd1d6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/292dd1d6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/292dd1d6 Branch: refs/heads/master Commit: 292dd1d66b1726794f0d34523578727ea3a7fe08 Parents: 6a75ab1 Author: Mark Payne <[email protected]> Authored: Thu Apr 6 15:57:11 2017 -0400 Committer: Bryan Bende <[email protected]> Committed: Fri Apr 7 10:28:27 2017 -0400 ---------------------------------------------------------------------- .../org/wali/MinimalLockingWriteAheadLog.java | 32 ++++---- .../wali/TestMinimalLockingWriteAheadLog.java | 82 ++++++++++++++++++++ .../repository/SchemaRepositoryRecordSerde.java | 4 + 3 files changed, 104 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/292dd1d6/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 5334acb..8949073 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -973,24 +973,28 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor logger.debug("{} recovering from {}", this, nextRecoveryPath); recoveryIn = createDataInputStream(nextRecoveryPath); if (hasMoreData(recoveryIn)) { - final String waliImplementationClass = recoveryIn.readUTF(); - if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) { - continue; - } + try { + final String waliImplementationClass = recoveryIn.readUTF(); + if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) { + continue; + } - final long waliVersion = recoveryIn.readInt(); - if (waliVersion > writeAheadLogVersion) { - throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using " + final long waliVersion = recoveryIn.readInt(); + if (waliVersion > writeAheadLogVersion) { + throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using " + "WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion); - } - - final String serdeEncoding = recoveryIn.readUTF(); - this.recoveryVersion = recoveryIn.readInt(); - serde = serdeFactory.createSerDe(serdeEncoding); + } - serde.readHeader(recoveryIn); + final String serdeEncoding = recoveryIn.readUTF(); + this.recoveryVersion = recoveryIn.readInt(); + serde = serdeFactory.createSerDe(serdeEncoding); - break; + serde.readHeader(recoveryIn); + break; + } catch (final Exception e) { + logger.warn("Failed to recover data from Write-Ahead Log for {} because the header information could not be read properly. " + + "This often is the result of the file not being fully written out before the application is restarted. This file will be ignored.", nextRecoveryPath); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/292dd1d6/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java index 5cdad82..cbca968 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.File; import java.io.FileFilter; import java.io.FileInputStream; @@ -41,6 +42,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; @@ -54,6 +56,86 @@ public class TestMinimalLockingWriteAheadLog { @Test + public void testTruncatedPartitionHeader() throws IOException { + final int numPartitions = 4; + + final Path path = Paths.get("target/testTruncatedPartitionHeader"); + deleteRecursively(path.toFile()); + assertTrue(path.toFile().mkdirs()); + + final AtomicInteger counter = new AtomicInteger(0); + final SerDe<Object> serde = new SerDe<Object>() { + @Override + public void readHeader(DataInputStream in) throws IOException { + if (counter.getAndIncrement() == 1) { + throw new EOFException("Intentionally thrown for unit test"); + } + } + + @Override + public void serializeEdit(Object previousRecordState, Object newRecordState, DataOutputStream out) throws IOException { + out.write(1); + } + + @Override + public void serializeRecord(Object record, DataOutputStream out) throws IOException { + out.write(1); + } + + @Override + public Object deserializeEdit(DataInputStream in, Map<Object, Object> currentRecordStates, int version) throws IOException { + final int val = in.read(); + return (val == 1) ? new Object() : null; + } + + @Override + public Object deserializeRecord(DataInputStream in, int version) throws IOException { + final int val = in.read(); + return (val == 1) ? new Object() : null; + } + + @Override + public Object getRecordIdentifier(Object record) { + return 1; + } + + @Override + public UpdateType getUpdateType(Object record) { + return UpdateType.CREATE; + } + + @Override + public String getLocation(Object record) { + return null; + } + + @Override + public int getVersion() { + return 0; + } + }; + + final WriteAheadRepository<Object> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, (SyncListener) null); + try { + final Collection<Object> initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + repo.update(Collections.singletonList(new Object()), false); + repo.update(Collections.singletonList(new Object()), false); + repo.update(Collections.singletonList(new Object()), false); + } finally { + repo.shutdown(); + } + + final WriteAheadRepository<Object> secondRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, (SyncListener) null); + try { + secondRepo.recoverRecords(); + } finally { + secondRepo.shutdown(); + } + } + + @Test @Ignore("for local testing only") public void testUpdatePerformance() throws IOException, InterruptedException { final int numPartitions = 4; http://git-wip-us.apache.org/repos/asf/nifi/blob/292dd1d6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java index 75f6ff2..221f8ce 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java @@ -113,6 +113,10 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema); final Record updateRecord = reader.readRecord(in); + if (updateRecord == null) { + // null may be returned by reader.readRecord() if it encounters end-of-stream + return null; + } // Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the // top level that indicates which type of record we have.
