FLUME-1699: Make the rename of the meta file platform neutral (Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/019358d9 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/019358d9 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/019358d9 Branch: refs/heads/flume-1.3.0 Commit: 019358d926b9d6c470dfb0b449c4b9a4c31b7588 Parents: e3de092 Author: Brock Noland <[email protected]> Authored: Mon Nov 19 14:24:51 2012 -0600 Committer: Hari Shreedharan <[email protected]> Committed: Thu Dec 20 00:11:53 2012 -0800 ---------------------------------------------------------------------- .../apache/flume/channel/file/LogFileFactory.java | 40 ++++++++- .../org/apache/flume/channel/file/LogFileV3.java | 14 +++- .../apache/flume/channel/file/Serialization.java | 7 ++ .../org/apache/flume/channel/file/TestLogFile.java | 73 +++++++++++++++ 4 files changed, 132 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/019358d9/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java index 82e14b0..4783448 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java @@ -109,7 +109,45 @@ class LogFileFactory { RandomAccessFile logFile = null; try { File metaDataFile = Serialization.getMetaDataFile(file); - if(metaDataFile.exists()) { + File oldMetadataFile = Serialization.getOldMetaDataFile(file); + File tempMetadataFile = Serialization.getMetaDataTempFile(file); + boolean hasMeta = false; + // FLUME-1699: + // If the platform does not support atomic rename, then we + // renamed log.meta -> log.meta.old followed by log.meta.tmp -> log.meta + // I am not sure if all platforms maintain file integrity during + // file metadata update operations. So: + // 1. check if meta file exists + // 2. If 1 returns false, check if temp exists + // 3. if 2 is also false (maybe the machine died during temp->meta, + // then check if old exists. + // In the above, we assume that if a file exists, it's integrity is ok. + if (metaDataFile.exists()) { + hasMeta = true; + } else if (tempMetadataFile.exists()) { + if (tempMetadataFile.renameTo(metaDataFile)) { + hasMeta = true; + } else { + throw new IOException("Renaming of " + tempMetadataFile.getName() + + " to " + metaDataFile.getName() + " failed"); + } + } else if (oldMetadataFile.exists()) { + if (oldMetadataFile.renameTo(metaDataFile)) { + hasMeta = true; + } else { + throw new IOException("Renaming of " + oldMetadataFile.getName() + + " to " + metaDataFile.getName() + " failed"); + } + } + if (hasMeta) { + // Now the metadata file has been found, delete old or temp files + // so it does not interfere with normal operation. + if(oldMetadataFile.exists()) { + oldMetadataFile.delete(); + } + if(tempMetadataFile.exists()) { + tempMetadataFile.delete(); + } return new LogFileV3.SequentialReader(file, encryptionKeyProvider); } logFile = new RandomAccessFile(file, "r"); http://git-wip-us.apache.org/repos/asf/flume/blob/019358d9/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java index f768d23..b4c197e 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java @@ -138,7 +138,19 @@ class LogFileV3 extends LogFile { outputStream.close(); closed = true; if(!tmp.renameTo(file)) { - throw new IOException("Unable to move " + tmp + " over " + file); + //Some platforms don't support moving over an existing file. + //So: + //log.meta -> log.meta.old + //log.meta.tmp -> log.meta + //delete log.meta.old + File oldFile = Serialization.getOldMetaDataFile(file); + if(!file.renameTo(oldFile)){ + throw new IOException("Unable to rename " + file + " to " + oldFile); + } + if(!tmp.renameTo(file)) { + throw new IOException("Unable to rename " + tmp + " over " + file); + } + oldFile.delete(); } } finally { if(!closed) { http://git-wip-us.apache.org/repos/asf/flume/blob/019358d9/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java index 6b0eeb3..ef8cf72 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java @@ -32,6 +32,7 @@ class Serialization { static final String METADATA_FILENAME = ".meta"; static final String METADATA_TMP_FILENAME = ".tmp"; + static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old"; static File getMetaDataTempFile(File metaDataFile) { String metaDataFileName = metaDataFile.getName() + METADATA_TMP_FILENAME; @@ -43,4 +44,10 @@ class Serialization { return new File(file.getParentFile(), metaDataFileName); } + + // Support platforms that cannot do atomic renames - FLUME-1699 + static File getOldMetaDataFile(File file) { + String oldMetaDataFileName = file.getName() + OLD_METADATA_FILENAME; + return new File(file.getParentFile(), oldMetaDataFileName); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/019358d9/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java index 4b69698..9e28599 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java @@ -167,6 +167,79 @@ public class TestLogFile { Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody())); } } + + @Test + public void testReaderOldMetaFile() throws InterruptedException, IOException { + Map<Integer, Put> puts = Maps.newHashMap(); + for (int i = 0; i < 1000; i++) { + FlumeEvent eventIn = TestUtils.newPersistableEvent(); + Put put = new Put(++transactionID, WriteOrderOracle.next(), + eventIn); + ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); + FlumeEventPointer ptr = logFileWriter.put(bytes); + puts.put(ptr.getOffset(), put); + } + //rename the meta file to meta.old + File metadataFile = Serialization.getMetaDataFile(dataFile); + File oldMetadataFile = Serialization.getOldMetaDataFile(dataFile); + if (!metadataFile.renameTo(oldMetadataFile)) { + Assert.fail("Renaming to meta.old failed"); + } + LogFile.SequentialReader reader = + LogFileFactory.getSequentialReader(dataFile, null); + Assert.assertTrue(metadataFile.exists()); + Assert.assertFalse(oldMetadataFile.exists()); + LogRecord entry; + while ((entry = reader.next()) != null) { + Integer offset = entry.getOffset(); + TransactionEventRecord record = entry.getEvent(); + Put put = puts.get(offset); + FlumeEvent eventIn = put.getEvent(); + Assert.assertEquals(put.getTransactionID(), record.getTransactionID()); + Assert.assertTrue(record instanceof Put); + FlumeEvent eventOut = ((Put) record).getEvent(); + Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders()); + Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody())); + } + } + + @Test + public void testReaderTempMetaFile() throws InterruptedException, IOException { + Map<Integer, Put> puts = Maps.newHashMap(); + for (int i = 0; i < 1000; i++) { + FlumeEvent eventIn = TestUtils.newPersistableEvent(); + Put put = new Put(++transactionID, WriteOrderOracle.next(), + eventIn); + ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); + FlumeEventPointer ptr = logFileWriter.put(bytes); + puts.put(ptr.getOffset(), put); + } + //rename the meta file to meta.old + File metadataFile = Serialization.getMetaDataFile(dataFile); + File tempMetadataFile = Serialization.getMetaDataTempFile(dataFile); + File oldMetadataFile = Serialization.getOldMetaDataFile(dataFile); + oldMetadataFile.createNewFile(); //Make sure temp file is picked up. + if (!metadataFile.renameTo(tempMetadataFile)) { + Assert.fail("Renaming to meta.temp failed"); + } + LogFile.SequentialReader reader = + LogFileFactory.getSequentialReader(dataFile, null); + Assert.assertTrue(metadataFile.exists()); + Assert.assertFalse(tempMetadataFile.exists()); + Assert.assertFalse(oldMetadataFile.exists()); + LogRecord entry; + while ((entry = reader.next()) != null) { + Integer offset = entry.getOffset(); + TransactionEventRecord record = entry.getEvent(); + Put put = puts.get(offset); + FlumeEvent eventIn = put.getEvent(); + Assert.assertEquals(put.getTransactionID(), record.getTransactionID()); + Assert.assertTrue(record instanceof Put); + FlumeEvent eventOut = ((Put) record).getEvent(); + Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders()); + Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody())); + } + } @Test public void testWriteDelimitedTo() throws IOException { if(dataFile.isFile()) {
