Author: ivank Date: Tue Nov 29 17:46:36 2011 New Revision: 1207997 URL: http://svn.apache.org/viewvc?rev=1207997&view=rev Log: BOOKKEEPER-62: Bookie can not start when encountering corrupted records (breed via ivank)
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1207997&r1=1207996&r2=1207997&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/CHANGES.txt (original) +++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Nov 29 17:46:36 2011 @@ -86,6 +86,8 @@ BUGFIXES: BOOKKEEPER-125: log4j still used in some places (ivank) + BOOKKEEPER-62: Bookie can not start when encountering corrupted records (breed via ivank) + hedwig-server/ BOOKKEEPER-43: NullPointException when releasing topic (Sijie Guo via breed) Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1207997&r1=1207996&r2=1207997&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Tue Nov 29 17:46:36 2011 @@ -411,34 +411,39 @@ public class EntryLogger { long pos = LOGFILE_HEADER_SIZE; ConcurrentHashMap<Long, Boolean> entryLogLedgers = new ConcurrentHashMap<Long, Boolean>(); // Read through the entry log file and extract the ledger ID's. - while (true) { - // Check if we've finished reading the entry log file. - if (pos >= bc.size()) { - break; + try { + while (true) { + // Check if we've finished reading the entry log file. + if (pos >= bc.size()) { + break; + } + if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) { + throw new IOException("Short read from entrylog " + entryLogId); + } + pos += 4; + sizeBuff.flip(); + int entrySize = sizeBuff.getInt(); + if (entrySize > 1024 * 1024) { + LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in " + + entryLogId); + } + byte data[] = new byte[entrySize]; + ByteBuffer buff = ByteBuffer.wrap(data); + int rc = bc.read(buff, pos); + if (rc != data.length) { + throw new IOException("Short read for entryLog " + entryLogId + "@" + pos + "(" + rc + "!=" + + data.length + ")"); + } + buff.flip(); + long ledgerId = buff.getLong(); + entryLogLedgers.put(ledgerId, true); + // Advance position to the next entry and clear sizeBuff. + pos += entrySize; + sizeBuff.clear(); } - if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) { - throw new IOException("Short read from entrylog " + entryLogId); - } - pos += 4; - sizeBuff.flip(); - int entrySize = sizeBuff.getInt(); - if (entrySize > 1024 * 1024) { - LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in " - + entryLogId); - } - byte data[] = new byte[entrySize]; - ByteBuffer buff = ByteBuffer.wrap(data); - int rc = bc.read(buff, pos); - if (rc != data.length) { - throw new IOException("Short read for entryLog " + entryLogId + "@" + pos + "(" + rc + "!=" - + data.length + ")"); - } - buff.flip(); - long ledgerId = buff.getLong(); - entryLogLedgers.put(ledgerId, true); - // Advance position to the next entry and clear sizeBuff. - pos += entrySize; - sizeBuff.clear(); + } catch(IOException e) { + LOG.info("Premature exception when processing " + entryLogId + + "recovery will take care of the problem", e); } LOG.info("Retrieved all ledgers that comprise entryLogId: " + entryLogId + ", values: " + entryLogLedgers); entryLogs2LedgersMap.put(entryLogId, entryLogLedgers); Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java?rev=1207997&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java (added) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java Tue Nov 29 17:46:36 2011 @@ -0,0 +1,90 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.Map; + +import junit.framework.TestCase; + +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EntryLogTest extends TestCase { + static Logger LOG = LoggerFactory.getLogger(EntryLogTest.class); + + @Before + public void setUp() throws Exception { + } + + @Test + public void testCorruptEntryLog() throws IOException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException { + File tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + ServerConfiguration conf = new ServerConfiguration(); + conf.setLedgerDirNames(new String[] {tmpDir.toString()}); + // create some entries + EntryLogger logger = new EntryLogger(conf, null); + logger.addEntry(1, generateEntry(1, 1)); + logger.addEntry(3, generateEntry(3, 1)); + logger.addEntry(2, generateEntry(2, 1)); + logger.flush(); + // now lets truncate the file to corrupt the last entry, which simulates a partial write + File f = new File(tmpDir, "0.log"); + RandomAccessFile raf = new RandomAccessFile(f, "rw"); + raf.setLength(raf.length()-10); + raf.close(); + // now see which ledgers are in the log + logger = new EntryLogger(conf, null); + Field entryLogs2LedgersMapField = logger.getClass().getDeclaredField("entryLogs2LedgersMap"); + entryLogs2LedgersMapField.setAccessible(true); + @SuppressWarnings("unchecked") + Map<Long, Map<Long, Boolean>> ledgersMap = (Map<Long, Map<Long, Boolean>>) entryLogs2LedgersMapField.get(logger); + LOG.info("LedgersMap.get(0) {}", ledgersMap.get(0L)); + assertNotNull(ledgersMap.get(0L).get(1L)); + assertNull(ledgersMap.get(0L).get(2L)); + assertNotNull(ledgersMap.get(0L).get(3L)); + } + + private ByteBuffer generateEntry(long ledger, long entry) { + ByteBuffer bb = ByteBuffer.wrap(new byte[64]); + bb.putLong(ledger); + bb.putLong(entry); + bb.put(("ledger"+ledger).getBytes()); + bb.flip(); + return bb; + } + + @After + public void tearDown() throws Exception { + } + +}