FLUME-1761. FileChannel can NPE when log metadata file is empty (Brock Noland via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4ad7bc5b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4ad7bc5b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4ad7bc5b Branch: refs/heads/flume-1.3.0 Commit: 4ad7bc5b75526f812221f065281b8232a2ee1242 Parents: bf2676d Author: Hari Shreedharan <[email protected]> Authored: Tue Dec 18 11:17:33 2012 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Thu Dec 20 14:49:01 2012 -0800 ---------------------------------------------------------------------- .../flume/channel/file/CheckpointRebuilder.java | 8 +- .../java/org/apache/flume/channel/file/Commit.java | 5 +- .../apache/flume/channel/file/LogFileFactory.java | 10 ++ .../org/apache/flume/channel/file/LogFileV3.java | 13 ++- .../java/org/apache/flume/channel/file/Put.java | 4 +- .../org/apache/flume/channel/file/Rollback.java | 6 +- .../java/org/apache/flume/channel/file/Take.java | 5 +- .../flume/channel/file/TransactionEventRecord.java | 10 +- .../org/apache/flume/channel/file/TestLog.java | 99 +++++++++++++++ 9 files changed, 145 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/4ad7bc5b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java index 748f49a..6e64003 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java @@ -23,6 +23,8 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; + +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.util.Arrays; @@ -62,7 +64,11 @@ public class CheckpointRebuilder { LOG.info("Attempting to fast replay the log files."); List<LogFile.SequentialReader> logReaders = Lists.newArrayList(); for (File logFile : logFiles) { - logReaders.add(LogFileFactory.getSequentialReader(logFile, null)); + try { + logReaders.add(LogFileFactory.getSequentialReader(logFile, null)); + } catch(EOFException e) { + LOG.warn("Ignoring " + logFile + " due to EOF", e); + } } long transactionIDSeed = 0; long writeOrderIDSeed = 0; http://git-wip-us.apache.org/repos/asf/flume/blob/4ad7bc5b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java index 62f4451..3663244 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java @@ -26,6 +26,8 @@ import java.io.OutputStream; import org.apache.flume.channel.file.proto.ProtosFactory; +import com.google.common.base.Preconditions; + /** * Represents a Commit on disk */ @@ -55,7 +57,8 @@ class Commit extends TransactionEventRecord { } @Override void readProtos(InputStream in) throws IOException { - ProtosFactory.Commit commit = ProtosFactory.Commit.parseDelimitedFrom(in); + ProtosFactory.Commit commit = Preconditions.checkNotNull(ProtosFactory. + Commit.parseDelimitedFrom(in), "Commit cannot be null"); type = (short) commit.getType(); } http://git-wip-us.apache.org/repos/asf/flume/blob/4ad7bc5b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java index 4783448..1fe219a 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java @@ -18,6 +18,7 @@ */ package org.apache.flume.channel.file; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -148,6 +149,15 @@ class LogFileFactory { if(tempMetadataFile.exists()) { tempMetadataFile.delete(); } + if(metaDataFile.length() == 0L) { + if(file.length() != 0L) { + String msg = String.format("MetaData file %s is empty, but log %s" + + " is of size %d", metaDataFile, file, file.length()); + throw new IllegalStateException(msg); + } + throw new EOFException(String.format("MetaData file %s is empty", + metaDataFile)); + } return new LogFileV3.SequentialReader(file, encryptionKeyProvider); } logFile = new RandomAccessFile(file, "r"); http://git-wip-us.apache.org/repos/asf/flume/blob/4ad7bc5b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java index b4c197e..aac7805 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java @@ -100,8 +100,9 @@ class LogFileV3 extends LogFile { ProtosFactory.LogFileMetaData read() throws IOException { FileInputStream inputStream = new FileInputStream(metaDataFile); try { - ProtosFactory.LogFileMetaData metaData = - ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream); + ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull( + ProtosFactory.LogFileMetaData. + parseDelimitedFrom(inputStream), "Metadata cannot be null"); if (metaData.getLogFileID() != logFileID) { throw new IOException("The file id of log file: " + logFile + " is different from expected " @@ -216,7 +217,8 @@ class LogFileV3 extends LogFile { FileInputStream inputStream = new FileInputStream(metaDataFile); try { ProtosFactory.LogFileMetaData metaData = - ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream); + Preconditions.checkNotNull(ProtosFactory.LogFileMetaData. + parseDelimitedFrom(inputStream), "MetaData cannot be null"); int version = metaData.getVersion(); if(version != getVersion()) { throw new IOException("Version is " + Integer.toHexString(version) + @@ -295,8 +297,9 @@ class LogFileV3 extends LogFile { File metaDataFile = Serialization.getMetaDataFile(file); FileInputStream inputStream = new FileInputStream(metaDataFile); try { - ProtosFactory.LogFileMetaData metaData = - ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream); + ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull( + ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream), + "MetaData cannot be null"); int version = metaData.getVersion(); if(version != getVersion()) { throw new IOException("Version is " + Integer.toHexString(version) + http://git-wip-us.apache.org/repos/asf/flume/blob/4ad7bc5b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java index d47b1c8..4235a79 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.flume.channel.file.proto.ProtosFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.protobuf.ByteString; @@ -82,7 +83,8 @@ class Put extends TransactionEventRecord { } @Override void readProtos(InputStream in) throws IOException { - ProtosFactory.Put put = ProtosFactory.Put.parseDelimitedFrom(in); + ProtosFactory.Put put = Preconditions.checkNotNull(ProtosFactory. + Put.parseDelimitedFrom(in), "Put cannot be null"); Map<String, String> headers = Maps.newHashMap(); ProtosFactory.FlumeEvent protosEvent = put.getEvent(); for(ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) { http://git-wip-us.apache.org/repos/asf/flume/blob/4ad7bc5b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java index cc9ce86..335ad0b 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java @@ -26,6 +26,8 @@ import java.io.OutputStream; import org.apache.flume.channel.file.proto.ProtosFactory; +import com.google.common.base.Preconditions; + /** * Represents a Rollback on disk */ @@ -51,8 +53,8 @@ class Rollback extends TransactionEventRecord { @Override void readProtos(InputStream in) throws IOException { @SuppressWarnings("unused") - ProtosFactory.Rollback rollback = - ProtosFactory.Rollback.parseDelimitedFrom(in); + ProtosFactory.Rollback rollback = Preconditions.checkNotNull(ProtosFactory. + Rollback.parseDelimitedFrom(in), "Rollback cannot be null"); } @Override short getRecordType() { http://git-wip-us.apache.org/repos/asf/flume/blob/4ad7bc5b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java index e61bf7e..143143a 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java @@ -26,6 +26,8 @@ import java.io.OutputStream; import org.apache.flume.channel.file.proto.ProtosFactory; +import com.google.common.base.Preconditions; + /** * Represents a Take on disk */ @@ -70,7 +72,8 @@ class Take extends TransactionEventRecord { } @Override void readProtos(InputStream in) throws IOException { - ProtosFactory.Take take = ProtosFactory.Take.parseDelimitedFrom(in); + ProtosFactory.Take take = Preconditions.checkNotNull(ProtosFactory. + Take.parseDelimitedFrom(in), "Take cannot be null"); fileID = take.getFileID(); offset = take.getOffset(); } http://git-wip-us.apache.org/repos/asf/flume/blob/4ad7bc5b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java index 70098a0..073042f 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java @@ -190,8 +190,9 @@ abstract class TransactionEventRecord implements Writable { throws IOException { ByteArrayInputStream in = new ByteArrayInputStream(buffer); try { - ProtosFactory.TransactionEventHeader header = - ProtosFactory.TransactionEventHeader.parseDelimitedFrom(in); + ProtosFactory.TransactionEventHeader header = Preconditions. + checkNotNull(ProtosFactory.TransactionEventHeader. + parseDelimitedFrom(in), "Header cannot be null"); short type = (short)header.getType(); long transactionID = header.getTransactionID(); long writeOrderID = header.getWriteOrderID(); @@ -199,8 +200,9 @@ abstract class TransactionEventRecord implements Writable { newRecordForType(type, transactionID, writeOrderID); transactionEvent.readProtos(in); @SuppressWarnings("unused") - ProtosFactory.TransactionEventFooter footer = - ProtosFactory.TransactionEventFooter.parseDelimitedFrom(in); + ProtosFactory.TransactionEventFooter footer = Preconditions.checkNotNull( + ProtosFactory.TransactionEventFooter. + parseDelimitedFrom(in), "Footer cannot be null"); return transactionEvent; } finally { try { http://git-wip-us.apache.org/repos/asf/flume/blob/4ad7bc5b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index bc7b3cf..f9dbba5 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -337,6 +337,105 @@ public class TestLog { LogUtils.sort(expected); Assert.assertEquals(expected, actual); } + @Test + public void testReplayFailsWithAllEmptyLogMetaDataNormalReplay() + throws IOException, InterruptedException { + doTestReplayFailsWithAllEmptyLogMetaData(false); + } + @Test + public void testReplayFailsWithAllEmptyLogMetaDataFastReplay() + throws IOException, InterruptedException { + doTestReplayFailsWithAllEmptyLogMetaData(true); + } + public void doTestReplayFailsWithAllEmptyLogMetaData(boolean useFastReplay) + throws IOException, InterruptedException { + // setup log with correct fast replay parameter + log.close(); + log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( + MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( + checkpointDir).setLogDirs(dataDirs) + .setChannelName("testlog").setUseFastReplay(useFastReplay).build(); + log.replay(); + FlumeEvent eventIn = TestUtils.newPersistableEvent(); + long transactionID = ++this.transactionID; + log.put(transactionID, eventIn); + log.commitPut(transactionID); + log.close(); + if(useFastReplay) { + FileUtils.deleteQuietly(checkpointDir); + Assert.assertTrue(checkpointDir.mkdir()); + } + List<File> logFiles = Lists.newArrayList(); + for (int i = 0; i < dataDirs.length; i++) { + logFiles.addAll(LogUtils.getLogs(dataDirs[i])); + } + Assert.assertTrue(logFiles.size() > 0); + for(File logFile : logFiles) { + File logFileMeta = Serialization.getMetaDataFile(logFile); + Assert.assertTrue(logFileMeta.delete()); + Assert.assertTrue(logFileMeta.createNewFile()); + } + log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( + MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( + checkpointDir).setLogDirs(dataDirs) + .setChannelName("testlog").setUseFastReplay(useFastReplay).build(); + try { + log.replay(); + Assert.fail(); + } catch(IllegalStateException expected) { + String msg = expected.getMessage(); + Assert.assertNotNull(msg); + Assert.assertTrue(msg, msg.contains(".meta is empty, but log")); + } + } + @Test + public void testReplaySucceedsWithUnusedEmptyLogMetaDataNormalReplay() + throws IOException, InterruptedException { + FlumeEvent eventIn = TestUtils.newPersistableEvent(); + long transactionID = ++this.transactionID; + FlumeEventPointer eventPointer = log.put(transactionID, eventIn); + log.commitPut(transactionID); // this is not required since + log.close(); + log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( + MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( + checkpointDir).setLogDirs(dataDirs) + .setChannelName("testlog").build(); + doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer); + } + @Test + public void testReplaySucceedsWithUnusedEmptyLogMetaDataFastReplay() + throws IOException, InterruptedException { + FlumeEvent eventIn = TestUtils.newPersistableEvent(); + long transactionID = ++this.transactionID; + FlumeEventPointer eventPointer = log.put(transactionID, eventIn); + log.commitPut(transactionID); // this is not required since + log.close(); + FileUtils.deleteDirectory(checkpointDir); + Assert.assertTrue(checkpointDir.mkdir()); + log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( + MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( + checkpointDir).setLogDirs(dataDirs) + .setChannelName("testlog").setUseFastReplay(true).build(); + doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer); + } + public void doTestReplaySucceedsWithUnusedEmptyLogMetaData(FlumeEvent eventIn, + FlumeEventPointer eventPointer) throws IOException, + InterruptedException { + for (int i = 0; i < dataDirs.length; i++) { + for(File logFile : LogUtils.getLogs(dataDirs[i])) { + if(logFile.length() == 0L) { + File logFileMeta = Serialization.getMetaDataFile(logFile); + Assert.assertTrue(logFileMeta.delete()); + Assert.assertTrue(logFileMeta.createNewFile()); + } + } + } + log.replay(); + FlumeEvent eventOut = log.get(eventPointer); + Assert.assertNotNull(eventOut); + Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders()); + Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody()); + } private void takeAndVerify(FlumeEventPointer eventPointerIn, FlumeEvent eventIn) throws IOException, InterruptedException { FlumeEventQueue queue = log.getFlumeEventQueue();
