Updated Branches: refs/heads/trunk d25ef4242 -> eb7eab659
FLUME-1498. File channel log updates and queue updates should be atomic. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/eb7eab65 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/eb7eab65 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/eb7eab65 Branch: refs/heads/trunk Commit: eb7eab6593e241d9f67308298871f5586734b657 Parents: d25ef42 Author: Hari Shreedharan <[email protected]> Authored: Tue Aug 21 13:58:38 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Aug 21 13:58:38 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/FileChannel.java | 178 +++++---- .../java/org/apache/flume/channel/file/Log.java | 291 ++++++--------- 2 files changed, 208 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/eb7eab65/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index b5a0b88..995bad5 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -340,17 +340,37 @@ public class FileChannel extends BasicChannelSemantics { "committing more frequently, increasing capacity or " + "increasing thread count. " + channelNameDescriptor); } + // this does not need to be in the critical section as it does not + // modify the structure of the log or queue. if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) { throw new ChannelException("Cannot acquire capacity. " + channelNameDescriptor); } + boolean success = false; + boolean lockAcquired = log.tryLockShared(); try { + if(!lockAcquired) { + throw new ChannelException("Failed to obtain lock for writing to the log. " + + "Try increasing the log write timeout value or disabling it by " + + "setting it to 0. " + channelNameDescriptor); + } FlumeEventPointer ptr = log.put(transactionID, event); Preconditions.checkState(putList.offer(ptr), "putList offer failed " + channelNameDescriptor); + queue.addWithoutCommit(ptr, transactionID); + success = true; } catch (IOException e) { throw new ChannelException("Put failed due to IO error " + channelNameDescriptor, e); + } finally { + if(lockAcquired) { + log.unlockShared(); + } + if(!success) { + // release slot obtained in the case + // the put fails for any reason + queueRemaining.release(); + } } } @@ -363,24 +383,32 @@ public class FileChannel extends BasicChannelSemantics { "increasing capacity, or increasing thread count. " + channelNameDescriptor); } - FlumeEventPointer ptr = queue.removeHead(transactionID); - if(ptr != null) { - try { - // first add to takeList so that if write to disk - // fails rollback actually does it's work - Preconditions.checkState(takeList.offer(ptr), "takeList offer failed " - + channelNameDescriptor); - log.take(transactionID, ptr); // write take to disk - Event event = log.get(ptr); - return event; - } catch (IOException e) { - throw new ChannelException("Take failed due to IO error " - + channelNameDescriptor, e); + if(!log.tryLockShared()) { + throw new ChannelException("Failed to obtain lock for writing to the log. " + + "Try increasing the log write timeout value or disabling it by " + + "setting it to 0. " + channelNameDescriptor); + } + try { + FlumeEventPointer ptr = queue.removeHead(transactionID); + if(ptr != null) { + try { + // first add to takeList so that if write to disk + // fails rollback actually does it's work + Preconditions.checkState(takeList.offer(ptr), "takeList offer failed " + + channelNameDescriptor); + log.take(transactionID, ptr); // write take to disk + Event event = log.get(ptr); + return event; + } catch (IOException e) { + throw new ChannelException("Take failed due to IO error " + + channelNameDescriptor, e); + } } + return null; + } finally { + log.unlockShared(); } - return null; } - @Override protected void doCommit() throws InterruptedException { int puts = putList.size(); @@ -388,55 +416,52 @@ public class FileChannel extends BasicChannelSemantics { if(puts > 0) { Preconditions.checkState(takes == 0, "nonzero puts and takes " + channelNameDescriptor); - /* - * OK to not put this in synchronized(queue) block, because if a - * checkpoint occurs after the commit it is fine. - * The puts will be in the inflightputs file in the checkpoint. - * The commit did not return, so previous hop would not get success - * for the commit. - * The replay will not see the commit in the log file(since the - * commit is before the checkpoint in the logs) - and hence the events - * are not added back to the queue, so no duplicates or data loss. - */ + if(!log.tryLockShared()) { + throw new ChannelException("Failed to obtain lock for writing to the log. " + + "Try increasing the log write timeout value or disabling it by " + + "setting it to 0. " + channelNameDescriptor); + } try { log.commitPut(transactionID); channelCounter.addToEventPutSuccessCount(puts); + synchronized (queue) { + while(!putList.isEmpty()) { + if(!queue.addTail(putList.removeFirst())) { + StringBuilder msg = new StringBuilder(); + msg.append("Queue add failed, this shouldn't be able to "); + msg.append("happen. A portion of the transaction has been "); + msg.append("added to the queue but the remaining portion "); + msg.append("cannot be added. Those messages will be consumed "); + msg.append("despite this transaction failing. Please report."); + msg.append(channelNameDescriptor); + LOG.error(msg.toString()); + Preconditions.checkState(false, msg.toString()); + } + } + queue.completeTransaction(transactionID); + } } catch (IOException e) { throw new ChannelException("Commit failed due to IO error " + channelNameDescriptor, e); + } finally { + log.unlockShared(); } - synchronized (queue) { - while(!putList.isEmpty()) { - if(!queue.addTail(putList.removeFirst())) { - StringBuilder msg = new StringBuilder(); - msg.append("Queue add failed, this shouldn't be able to "); - msg.append("happen. A portion of the transaction has been "); - msg.append("added to the queue but the remaining portion "); - msg.append("cannot be added. Those messages will be consumed "); - msg.append("despite this transaction failing. Please report."); - msg.append(channelNameDescriptor); - LOG.error(msg.toString()); - Preconditions.checkState(false, msg.toString()); - } - } - queue.completeTransaction(transactionID); - } + } else if (takes > 0) { + if(!log.tryLockShared()) { + throw new ChannelException("Failed to obtain lock for writing to the log. " + + "Try increasing the log write timeout value or disabling it by " + + "setting it to 0. " + channelNameDescriptor); + } try { - /* - * OK to not have the commit take in synchronized(queue) block. - * If a checkpoint happens in between the commitTake and - * the completeTxn call, the takes will be in the inflightTakes file. - * When the channel replays the events, these takes will be put - * back into the channel - and will cause duplicates, but the - * number of duplicates will be pretty limited. - */ log.commitTake(transactionID); queue.completeTransaction(transactionID); channelCounter.addToEventTakeSuccessCount(takes); } catch (IOException e) { throw new ChannelException("Commit failed due to IO error " - + channelNameDescriptor, e); + + channelNameDescriptor, e); + } finally { + log.unlockShared(); } queueRemaining.release(takes); } @@ -444,41 +469,44 @@ public class FileChannel extends BasicChannelSemantics { takeList.clear(); channelCounter.setChannelSize(queue.getSize()); } - @Override protected void doRollback() throws InterruptedException { int puts = putList.size(); int takes = takeList.size(); - /* - * OK to not have the rollback within the synchronized(queue) block. - * If a checkpoint occurs between the rollback and the synchronized(queue) - * block, the takes are kept in the inflighttakes file in the checkpoint. - * During a replay the commit or rollback for the takes are not seen, - * so the takes are re-inserted into the queue - which is a rollback - * anyway. - */ + boolean lockAcquired = log.tryLockShared(); try { + if(!lockAcquired) { + throw new ChannelException("Failed to obtain lock for writing to the log. " + + "Try increasing the log write timeout value or disabling it by " + + "setting it to 0. " + channelNameDescriptor); + } log.rollback(transactionID); + if(takes > 0) { + Preconditions.checkState(puts == 0, "nonzero puts and takes " + + channelNameDescriptor); + synchronized (queue) { + while (!takeList.isEmpty()) { + Preconditions.checkState(queue.addHead(takeList.removeLast()), + "Queue add failed, this shouldn't be able to happen " + + channelNameDescriptor); + } + queue.completeTransaction(transactionID); + } + } + putList.clear(); + takeList.clear(); + channelCounter.setChannelSize(queue.getSize()); } catch (IOException e) { throw new ChannelException("Commit failed due to IO error " - + channelNameDescriptor, e); - } - if(takes > 0) { - Preconditions.checkState(puts == 0, "nonzero puts and takes " - + channelNameDescriptor); - synchronized (queue) { - while (!takeList.isEmpty()) { - Preconditions.checkState(queue.addHead(takeList.removeLast()), - "Queue add failed, this shouldn't be able to happen " - + channelNameDescriptor); - } - queue.completeTransaction(transactionID); + + channelNameDescriptor, e); + } finally { + if(lockAcquired) { + log.unlockShared(); } + // since rollback is being called, puts will never make it on + // to the queue and we need to be sure to release the resources + queueRemaining.release(puts); } - queueRemaining.release(puts); - putList.clear(); - takeList.clear(); - channelCounter.setChannelSize(queue.getSize()); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/eb7eab65/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index c356ca4..9b13423 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -52,6 +52,12 @@ import java.util.SortedSet; * Stores FlumeEvents on disk and pointers to the events in a in memory queue. * Once a log object is created the replay method should be called to reconcile * the on disk write ahead log with the last checkpoint of the queue. + * + * Before calling any of commitPut/commitTake/get/put/rollback/take + * Log.tryLockShared should be called and the above operations + * should only be called if tryLockShared returns true. After + * the operation and any additional modifications of the + * FlumeEventQueue, the Log.unlockShared method should be called. */ class Log { public static final String PREFIX = "log-"; @@ -75,7 +81,13 @@ class Log { private final Map<String, FileLock> locks; private final ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(true); + /** + * Shared lock + */ private final ReadLock checkpointReadLock = checkpointLock.readLock(); + /** + * Exclusive lock + */ private final WriteLock checkpointWriterLock = checkpointLock.writeLock(); private int logWriteTimeout; private final String channelName; @@ -208,7 +220,8 @@ class Log { void replay() throws IOException { Preconditions.checkState(!open, "Cannot replay after Log has been opened"); - checkpointWriterLock.lock(); + Preconditions.checkState(tryLockExclusive(), "Cannot obtain lock on " + + channelNameDescriptor); try { /* @@ -285,7 +298,7 @@ class Log { } Throwables.propagate(ex); } finally { - checkpointWriterLock.unlock(); + unlockExclusive(); } } @@ -312,31 +325,10 @@ class Log { FlumeEvent get(FlumeEventPointer pointer) throws IOException, InterruptedException { Preconditions.checkState(open, "Log is closed"); - - boolean lockAcquired = false; - try { - lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - LOGGER.warn("Interrupted while waiting for log write lock", ex); - Thread.currentThread().interrupt(); - } - - if (!lockAcquired) { - throw new IOException("Failed to obtain lock for writing to the log. " - + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0."); - } - - try { - int id = pointer.getFileID(); - LogFile.RandomReader logFile = idLogFileMap.get(id); - Preconditions.checkNotNull(logFile, "LogFile is null for id " + id); - return logFile.get(pointer.getOffset()); - } finally { - if (lockAcquired) { - checkpointReadLock.unlock(); - } - } + int id = pointer.getFileID(); + LogFile.RandomReader logFile = idLogFileMap.get(id); + Preconditions.checkNotNull(logFile, "LogFile is null for id " + id); + return logFile.get(pointer.getOffset()); } /** @@ -351,46 +343,23 @@ class Log { FlumeEventPointer put(long transactionID, Event event) throws IOException { Preconditions.checkState(open, "Log is closed"); - - boolean lockAcquired = false; + FlumeEvent flumeEvent = new FlumeEvent( + event.getHeaders(), event.getBody()); + Put put = new Put(transactionID, flumeEvent); + put.setLogWriteOrderID(WriteOrderOracle.next()); + ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put); + int logFileIndex = nextLogWriter(transactionID); + if (logFiles.get(logFileIndex).isRollRequired(buffer)) { + roll(logFileIndex, buffer); + } + boolean error = true; try { - lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - LOGGER.warn("Interrupted while waiting for log write lock on " + - channelNameDescriptor, ex); - Thread.currentThread().interrupt(); - } - - if (!lockAcquired) { - throw new IOException("Failed to obtain lock for writing to the log. " - + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0. " + channelNameDescriptor); - } - - try { - FlumeEvent flumeEvent = new FlumeEvent( - event.getHeaders(), event.getBody()); - Put put = new Put(transactionID, flumeEvent); - put.setLogWriteOrderID(WriteOrderOracle.next()); - ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put); - int logFileIndex = nextLogWriter(transactionID); - if (logFiles.get(logFileIndex).isRollRequired(buffer)) { - roll(logFileIndex, buffer); - } - boolean error = true; - try { - FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer); - queue.addWithoutCommit(ptr, transactionID); - error = false; - return ptr; - } finally { - if (error) { - roll(logFileIndex); - } - } + FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer); + error = false; + return ptr; } finally { - if (lockAcquired) { - checkpointReadLock.unlock(); + if (error) { + roll(logFileIndex); } } } @@ -406,42 +375,21 @@ class Log { void take(long transactionID, FlumeEventPointer pointer) throws IOException { Preconditions.checkState(open, "Log is closed"); - - boolean lockAcquired = false; - try { - lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - LOGGER.warn("Interrupted while waiting for log write lock", ex); - Thread.currentThread().interrupt(); - } - - if (!lockAcquired) { - throw new IOException("Failed to obtain lock for writing to the log. " - + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0. " + channelNameDescriptor); - } - + Take take = new Take(transactionID, pointer.getOffset(), + pointer.getFileID()); + take.setLogWriteOrderID(WriteOrderOracle.next()); + ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take); + int logFileIndex = nextLogWriter(transactionID); + if (logFiles.get(logFileIndex).isRollRequired(buffer)) { + roll(logFileIndex, buffer); + } + boolean error = true; try { - Take take = new Take(transactionID, pointer.getOffset(), - pointer.getFileID()); - take.setLogWriteOrderID(WriteOrderOracle.next()); - ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take); - int logFileIndex = nextLogWriter(transactionID); - if (logFiles.get(logFileIndex).isRollRequired(buffer)) { - roll(logFileIndex, buffer); - } - boolean error = true; - try { - logFiles.get(logFileIndex).take(buffer); - error = false; - } finally { - if (error) { - roll(logFileIndex); - } - } + logFiles.get(logFileIndex).take(buffer); + error = false; } finally { - if (lockAcquired) { - checkpointReadLock.unlock(); + if (error) { + roll(logFileIndex); } } } @@ -456,44 +404,23 @@ class Log { void rollback(long transactionID) throws IOException { Preconditions.checkState(open, "Log is closed"); - boolean lockAcquired = false; - try { - lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - LOGGER.warn("Interrupted while waiting for log write lock", ex); - Thread.currentThread().interrupt(); - } - - if (!lockAcquired) { - throw new IOException("Failed to obtain lock for writing to the log. " - + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0. "+ channelNameDescriptor); - } - if(LOGGER.isDebugEnabled()) { LOGGER.debug("Rolling back " + transactionID); } - + Rollback rollback = new Rollback(transactionID); + rollback.setLogWriteOrderID(WriteOrderOracle.next()); + ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback); + int logFileIndex = nextLogWriter(transactionID); + if (logFiles.get(logFileIndex).isRollRequired(buffer)) { + roll(logFileIndex, buffer); + } + boolean error = true; try { - Rollback rollback = new Rollback(transactionID); - rollback.setLogWriteOrderID(WriteOrderOracle.next()); - ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback); - int logFileIndex = nextLogWriter(transactionID); - if (logFiles.get(logFileIndex).isRollRequired(buffer)) { - roll(logFileIndex, buffer); - } - boolean error = true; - try { - logFiles.get(logFileIndex).rollback(buffer); - error = false; - } finally { - if (error) { - roll(logFileIndex); - } - } + logFiles.get(logFileIndex).rollback(buffer); + error = false; } finally { - if (lockAcquired) { - checkpointReadLock.unlock(); + if (error) { + roll(logFileIndex); } } } @@ -532,6 +459,36 @@ class Log { commit(transactionID, TransactionEventRecord.Type.TAKE.get()); } + + private boolean tryLockExclusive() { + try { + return checkpointWriterLock.tryLock(checkpointWriteTimeout, + TimeUnit.SECONDS); + } catch (InterruptedException ex) { + LOGGER.warn("Interrupted while waiting for log exclusive lock", ex); + Thread.currentThread().interrupt(); + } + return false; + } + private void unlockExclusive() { + checkpointWriterLock.unlock(); + } + + boolean tryLockShared() { + try { + return checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + LOGGER.warn("Interrupted while waiting for log shared lock", ex); + Thread.currentThread().interrupt(); + } + return false; + } + + void unlockShared() { + checkpointReadLock.unlock(); + } + + /** * Synchronization required since we do not want this * to be called during a checkpoint. @@ -590,41 +547,20 @@ class Log { private void commit(long transactionID, short type) throws IOException { Preconditions.checkState(open, "Log is closed"); - - boolean lockAcquired = false; + Commit commit = new Commit(transactionID, type); + commit.setLogWriteOrderID(WriteOrderOracle.next()); + ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit); + int logFileIndex = nextLogWriter(transactionID); + if (logFiles.get(logFileIndex).isRollRequired(buffer)) { + roll(logFileIndex, buffer); + } + boolean error = true; try { - lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - LOGGER.warn("Interrupted while waiting for log write lock", ex); - Thread.currentThread().interrupt(); - } - - if (!lockAcquired) { - throw new IOException("Failed to obtain lock for writing to the log. " - + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0. " + channelNameDescriptor); - } - - try { - Commit commit = new Commit(transactionID, type); - commit.setLogWriteOrderID(WriteOrderOracle.next()); - ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit); - int logFileIndex = nextLogWriter(transactionID); - if (logFiles.get(logFileIndex).isRollRequired(buffer)) { - roll(logFileIndex, buffer); - } - boolean error = true; - try { - logFiles.get(logFileIndex).commit(buffer); - error = false; - } finally { - if (error) { - roll(logFileIndex); - } - } + logFiles.get(logFileIndex).commit(buffer); + error = false; } finally { - if (lockAcquired) { - checkpointReadLock.unlock(); + if (error) { + roll(logFileIndex); } } } @@ -661,15 +597,7 @@ class Log { */ private synchronized void roll(int index, ByteBuffer buffer) throws IOException { - boolean lockAcquired = false; - try { - lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - LOGGER.warn("Interrupted while waiting for log write lock", ex); - Thread.currentThread().interrupt(); - } - - if (!lockAcquired) { + if (!tryLockShared()) { throw new IOException("Failed to obtain lock for writing to the log. " + "Try increasing the log write timeout value or disabling it by " + "setting it to 0. "+ channelNameDescriptor); @@ -701,9 +629,7 @@ class Log { } } } finally { - if (lockAcquired) { - checkpointReadLock.unlock(); - } + unlockShared(); } } @@ -722,15 +648,8 @@ class Log { * @throws IOException if we are unable to write the checkpoint out to disk */ private boolean writeCheckpoint(boolean force) throws Exception { - boolean lockAcquired = false; boolean checkpointCompleted = false; - try { - lockAcquired = checkpointWriterLock.tryLock(this.checkpointWriteTimeout, - TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.warn("Interrupted while waiting to acquire lock.", e); - Thread.currentThread().interrupt(); - } + boolean lockAcquired = tryLockExclusive(); if(!lockAcquired) { return false; } @@ -784,7 +703,7 @@ class Log { checkpointCompleted = true; } } finally { - checkpointWriterLock.unlock(); + unlockExclusive(); } //Do the deletes outside the checkpointWriterLock //Delete logic is expensive.
