Updated Branches:
  refs/heads/trunk d25ef4242 -> eb7eab659

FLUME-1498. File channel log updates and queue updates should be atomic.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/eb7eab65
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/eb7eab65
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/eb7eab65

Branch: refs/heads/trunk
Commit: eb7eab6593e241d9f67308298871f5586734b657
Parents: d25ef42
Author: Hari Shreedharan <[email protected]>
Authored: Tue Aug 21 13:58:38 2012 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Tue Aug 21 13:58:38 2012 -0700

----------------------------------------------------------------------
 .../org/apache/flume/channel/file/FileChannel.java |  178 +++++----
 .../java/org/apache/flume/channel/file/Log.java    |  291 ++++++---------
 2 files changed, 208 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/eb7eab65/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index b5a0b88..995bad5 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -340,17 +340,37 @@ public class FileChannel extends BasicChannelSemantics {
             "committing more frequently, increasing capacity or " +
             "increasing thread count. " + channelNameDescriptor);
       }
+      // this does not need to be in the critical section as it does not
+      // modify the structure of the log or queue.
       if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
         throw new ChannelException("Cannot acquire capacity. "
             + channelNameDescriptor);
       }
+      boolean success = false;
+      boolean lockAcquired = log.tryLockShared();
       try {
+        if(!lockAcquired) {
+          throw new ChannelException("Failed to obtain lock for writing to the 
log. "
+              + "Try increasing the log write timeout value or disabling it by 
"
+              + "setting it to 0. " + channelNameDescriptor);
+        }
         FlumeEventPointer ptr = log.put(transactionID, event);
         Preconditions.checkState(putList.offer(ptr), "putList offer failed "
              + channelNameDescriptor);
+        queue.addWithoutCommit(ptr, transactionID);
+        success = true;
       } catch (IOException e) {
         throw new ChannelException("Put failed due to IO error "
                 + channelNameDescriptor, e);
+      } finally {
+        if(lockAcquired) {
+          log.unlockShared();
+        }
+        if(!success) {
+          // release slot obtained in the case
+          // the put fails for any reason
+          queueRemaining.release();
+        }
       }
     }
 
@@ -363,24 +383,32 @@ public class FileChannel extends BasicChannelSemantics {
             "increasing capacity, or increasing thread count. "
                + channelNameDescriptor);
       }
-      FlumeEventPointer ptr = queue.removeHead(transactionID);
-      if(ptr != null) {
-        try {
-          // first add to takeList so that if write to disk
-          // fails rollback actually does it's work
-          Preconditions.checkState(takeList.offer(ptr), "takeList offer failed 
"
-               + channelNameDescriptor);
-          log.take(transactionID, ptr); // write take to disk
-          Event event = log.get(ptr);
-          return event;
-        } catch (IOException e) {
-          throw new ChannelException("Take failed due to IO error "
-                  + channelNameDescriptor, e);
+      if(!log.tryLockShared()) {
+        throw new ChannelException("Failed to obtain lock for writing to the 
log. "
+            + "Try increasing the log write timeout value or disabling it by "
+            + "setting it to 0. " + channelNameDescriptor);
+      }
+      try {
+        FlumeEventPointer ptr = queue.removeHead(transactionID);
+        if(ptr != null) {
+          try {
+            // first add to takeList so that if write to disk
+            // fails rollback actually does it's work
+            Preconditions.checkState(takeList.offer(ptr), "takeList offer 
failed "
+                 + channelNameDescriptor);
+            log.take(transactionID, ptr); // write take to disk
+            Event event = log.get(ptr);
+            return event;
+          } catch (IOException e) {
+            throw new ChannelException("Take failed due to IO error "
+                    + channelNameDescriptor, e);
+          }
         }
+        return null;
+      } finally {
+        log.unlockShared();
       }
-      return null;
     }
-
     @Override
     protected void doCommit() throws InterruptedException {
       int puts = putList.size();
@@ -388,55 +416,52 @@ public class FileChannel extends BasicChannelSemantics {
       if(puts > 0) {
         Preconditions.checkState(takes == 0, "nonzero puts and takes "
                 + channelNameDescriptor);
-        /*
-         * OK to not put this in synchronized(queue) block, because if a
-         * checkpoint occurs after the commit it is fine.
-         * The puts will be in the inflightputs file in the checkpoint.
-         * The commit did not return, so previous hop would not get success
-         * for the commit.
-         * The replay will not see the commit in the log file(since the
-         * commit is before the checkpoint in the logs) - and hence the events
-         * are not added back to the queue, so no duplicates or data loss.
-         */
+        if(!log.tryLockShared()) {
+          throw new ChannelException("Failed to obtain lock for writing to the 
log. "
+              + "Try increasing the log write timeout value or disabling it by 
"
+              + "setting it to 0. " + channelNameDescriptor);
+        }
         try {
           log.commitPut(transactionID);
           channelCounter.addToEventPutSuccessCount(puts);
+          synchronized (queue) {
+            while(!putList.isEmpty()) {
+              if(!queue.addTail(putList.removeFirst())) {
+                StringBuilder msg = new StringBuilder();
+                msg.append("Queue add failed, this shouldn't be able to ");
+                msg.append("happen. A portion of the transaction has been ");
+                msg.append("added to the queue but the remaining portion ");
+                msg.append("cannot be added. Those messages will be consumed 
");
+                msg.append("despite this transaction failing. Please report.");
+                msg.append(channelNameDescriptor);
+                LOG.error(msg.toString());
+                Preconditions.checkState(false, msg.toString());
+              }
+            }
+            queue.completeTransaction(transactionID);
+          }
         } catch (IOException e) {
           throw new ChannelException("Commit failed due to IO error "
                   + channelNameDescriptor, e);
+        } finally {
+          log.unlockShared();
         }
-        synchronized (queue) {
-          while(!putList.isEmpty()) {
-            if(!queue.addTail(putList.removeFirst())) {
-              StringBuilder msg = new StringBuilder();
-              msg.append("Queue add failed, this shouldn't be able to ");
-              msg.append("happen. A portion of the transaction has been ");
-              msg.append("added to the queue but the remaining portion ");
-              msg.append("cannot be added. Those messages will be consumed ");
-              msg.append("despite this transaction failing. Please report.");
-              msg.append(channelNameDescriptor);
-              LOG.error(msg.toString());
-              Preconditions.checkState(false, msg.toString());
-            }
-          }
-          queue.completeTransaction(transactionID);
-        }
+
       } else if (takes > 0) {
+        if(!log.tryLockShared()) {
+          throw new ChannelException("Failed to obtain lock for writing to the 
log. "
+              + "Try increasing the log write timeout value or disabling it by 
"
+              + "setting it to 0. " + channelNameDescriptor);
+        }
         try {
-          /*
-           * OK to not have the commit take in synchronized(queue) block.
-           * If a checkpoint happens in between the commitTake and
-           * the completeTxn call, the takes will be in the inflightTakes file.
-           * When the channel replays the events, these takes will be put
-           * back into the channel - and will cause duplicates, but the
-           * number of duplicates will be pretty limited.
-           */
           log.commitTake(transactionID);
           queue.completeTransaction(transactionID);
           channelCounter.addToEventTakeSuccessCount(takes);
         } catch (IOException e) {
           throw new ChannelException("Commit failed due to IO error "
-                  + channelNameDescriptor, e);
+              + channelNameDescriptor, e);
+        } finally {
+          log.unlockShared();
         }
         queueRemaining.release(takes);
       }
@@ -444,41 +469,44 @@ public class FileChannel extends BasicChannelSemantics {
       takeList.clear();
       channelCounter.setChannelSize(queue.getSize());
     }
-
     @Override
     protected void doRollback() throws InterruptedException {
       int puts = putList.size();
       int takes = takeList.size();
-      /*
-       * OK to not have the rollback within the synchronized(queue) block.
-       * If a checkpoint occurs between the rollback and the 
synchronized(queue)
-       * block, the takes are kept in the inflighttakes file in the checkpoint.
-       * During a replay the commit or rollback for the takes are not seen,
-       * so the takes are re-inserted into the queue - which is a rollback
-       * anyway.
-       */
+      boolean lockAcquired = log.tryLockShared();
       try {
+        if(!lockAcquired) {
+          throw new ChannelException("Failed to obtain lock for writing to the 
log. "
+              + "Try increasing the log write timeout value or disabling it by 
"
+              + "setting it to 0. " + channelNameDescriptor);
+        }
         log.rollback(transactionID);
+        if(takes > 0) {
+          Preconditions.checkState(puts == 0, "nonzero puts and takes "
+              + channelNameDescriptor);
+          synchronized (queue) {
+            while (!takeList.isEmpty()) {
+              Preconditions.checkState(queue.addHead(takeList.removeLast()),
+                  "Queue add failed, this shouldn't be able to happen "
+                      + channelNameDescriptor);
+            }
+            queue.completeTransaction(transactionID);
+          }
+        }
+        putList.clear();
+        takeList.clear();
+        channelCounter.setChannelSize(queue.getSize());
       } catch (IOException e) {
         throw new ChannelException("Commit failed due to IO error "
-                + channelNameDescriptor, e);
-      }
-      if(takes > 0) {
-        Preconditions.checkState(puts == 0, "nonzero puts and takes "
-            + channelNameDescriptor);
-        synchronized (queue) {
-          while (!takeList.isEmpty()) {
-            Preconditions.checkState(queue.addHead(takeList.removeLast()),
-                    "Queue add failed, this shouldn't be able to happen "
-                    + channelNameDescriptor);
-          }
-          queue.completeTransaction(transactionID);
+            + channelNameDescriptor, e);
+      } finally {
+        if(lockAcquired) {
+          log.unlockShared();
         }
+        // since rollback is being called, puts will never make it on
+        // to the queue and we need to be sure to release the resources
+        queueRemaining.release(puts);
       }
-      queueRemaining.release(puts);
-      putList.clear();
-      takeList.clear();
-      channelCounter.setChannelSize(queue.getSize());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/eb7eab65/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index c356ca4..9b13423 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -52,6 +52,12 @@ import java.util.SortedSet;
  * Stores FlumeEvents on disk and pointers to the events in a in memory queue.
  * Once a log object is created the replay method should be called to reconcile
  * the on disk write ahead log with the last checkpoint of the queue.
+ *
+ * Before calling any of commitPut/commitTake/get/put/rollback/take
+ * Log.tryLockShared should be called and the above operations
+ * should only be called if tryLockShared returns true. After
+ * the operation and any additional modifications of the
+ * FlumeEventQueue, the Log.unlockShared method should be called.
  */
 class Log {
   public static final String PREFIX = "log-";
@@ -75,7 +81,13 @@ class Log {
   private final Map<String, FileLock> locks;
   private final ReentrantReadWriteLock checkpointLock =
       new ReentrantReadWriteLock(true);
+  /**
+   * Shared lock
+   */
   private final ReadLock checkpointReadLock = checkpointLock.readLock();
+  /**
+   * Exclusive lock
+   */
   private final WriteLock checkpointWriterLock = checkpointLock.writeLock();
   private int logWriteTimeout;
   private final String channelName;
@@ -208,7 +220,8 @@ class Log {
   void replay() throws IOException {
     Preconditions.checkState(!open, "Cannot replay after Log has been opened");
 
-    checkpointWriterLock.lock();
+    Preconditions.checkState(tryLockExclusive(), "Cannot obtain lock on "
+        + channelNameDescriptor);
 
     try {
       /*
@@ -285,7 +298,7 @@ class Log {
       }
       Throwables.propagate(ex);
     } finally {
-      checkpointWriterLock.unlock();
+      unlockExclusive();
     }
   }
 
@@ -312,31 +325,10 @@ class Log {
   FlumeEvent get(FlumeEventPointer pointer) throws IOException,
   InterruptedException {
     Preconditions.checkState(open, "Log is closed");
-
-    boolean lockAcquired = false;
-    try {
-      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, 
TimeUnit.SECONDS);
-    } catch (InterruptedException ex) {
-      LOGGER.warn("Interrupted while waiting for log write lock", ex);
-      Thread.currentThread().interrupt();
-    }
-
-    if (!lockAcquired) {
-      throw new IOException("Failed to obtain lock for writing to the log. "
-          + "Try increasing the log write timeout value or disabling it by "
-          + "setting it to 0.");
-    }
-
-    try {
-      int id = pointer.getFileID();
-      LogFile.RandomReader logFile = idLogFileMap.get(id);
-      Preconditions.checkNotNull(logFile, "LogFile is null for id " + id);
-      return logFile.get(pointer.getOffset());
-    } finally {
-      if (lockAcquired) {
-        checkpointReadLock.unlock();
-      }
-    }
+    int id = pointer.getFileID();
+    LogFile.RandomReader logFile = idLogFileMap.get(id);
+    Preconditions.checkNotNull(logFile, "LogFile is null for id " + id);
+    return logFile.get(pointer.getOffset());
   }
 
   /**
@@ -351,46 +343,23 @@ class Log {
   FlumeEventPointer put(long transactionID, Event event)
       throws IOException {
     Preconditions.checkState(open, "Log is closed");
-
-    boolean lockAcquired = false;
+    FlumeEvent flumeEvent = new FlumeEvent(
+        event.getHeaders(), event.getBody());
+    Put put = new Put(transactionID, flumeEvent);
+    put.setLogWriteOrderID(WriteOrderOracle.next());
+    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put);
+    int logFileIndex = nextLogWriter(transactionID);
+    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+      roll(logFileIndex, buffer);
+    }
+    boolean error = true;
     try {
-      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, 
TimeUnit.SECONDS);
-    } catch (InterruptedException ex) {
-      LOGGER.warn("Interrupted while waiting for log write lock on " +
-          channelNameDescriptor, ex);
-      Thread.currentThread().interrupt();
-    }
-
-    if (!lockAcquired) {
-      throw new IOException("Failed to obtain lock for writing to the log. "
-          + "Try increasing the log write timeout value or disabling it by "
-          + "setting it to 0. " + channelNameDescriptor);
-    }
-
-    try {
-      FlumeEvent flumeEvent = new FlumeEvent(
-                    event.getHeaders(), event.getBody());
-      Put put = new Put(transactionID, flumeEvent);
-      put.setLogWriteOrderID(WriteOrderOracle.next());
-      ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put);
-      int logFileIndex = nextLogWriter(transactionID);
-      if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-        roll(logFileIndex, buffer);
-      }
-      boolean error = true;
-      try {
-        FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);
-        queue.addWithoutCommit(ptr, transactionID);
-        error = false;
-        return ptr;
-      } finally {
-        if (error) {
-          roll(logFileIndex);
-        }
-      }
+      FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);
+      error = false;
+      return ptr;
     } finally {
-      if (lockAcquired) {
-        checkpointReadLock.unlock();
+      if (error) {
+        roll(logFileIndex);
       }
     }
   }
@@ -406,42 +375,21 @@ class Log {
   void take(long transactionID, FlumeEventPointer pointer)
       throws IOException {
     Preconditions.checkState(open, "Log is closed");
-
-    boolean lockAcquired = false;
-    try {
-      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, 
TimeUnit.SECONDS);
-    } catch (InterruptedException ex) {
-      LOGGER.warn("Interrupted while waiting for log write lock", ex);
-      Thread.currentThread().interrupt();
-    }
-
-    if (!lockAcquired) {
-      throw new IOException("Failed to obtain lock for writing to the log. "
-          + "Try increasing the log write timeout value or disabling it by "
-          + "setting it to 0. " + channelNameDescriptor);
-    }
-
+    Take take = new Take(transactionID, pointer.getOffset(),
+        pointer.getFileID());
+    take.setLogWriteOrderID(WriteOrderOracle.next());
+    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take);
+    int logFileIndex = nextLogWriter(transactionID);
+    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+      roll(logFileIndex, buffer);
+    }
+    boolean error = true;
     try {
-      Take take = new Take(transactionID, pointer.getOffset(),
-          pointer.getFileID());
-      take.setLogWriteOrderID(WriteOrderOracle.next());
-      ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take);
-      int logFileIndex = nextLogWriter(transactionID);
-      if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-        roll(logFileIndex, buffer);
-      }
-      boolean error = true;
-      try {
-        logFiles.get(logFileIndex).take(buffer);
-        error = false;
-      } finally {
-        if (error) {
-          roll(logFileIndex);
-        }
-      }
+      logFiles.get(logFileIndex).take(buffer);
+      error = false;
     } finally {
-      if (lockAcquired) {
-        checkpointReadLock.unlock();
+      if (error) {
+        roll(logFileIndex);
       }
     }
   }
@@ -456,44 +404,23 @@ class Log {
   void rollback(long transactionID) throws IOException {
     Preconditions.checkState(open, "Log is closed");
 
-    boolean lockAcquired = false;
-    try {
-      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, 
TimeUnit.SECONDS);
-    } catch (InterruptedException ex) {
-      LOGGER.warn("Interrupted while waiting for log write lock", ex);
-      Thread.currentThread().interrupt();
-    }
-
-    if (!lockAcquired) {
-      throw new IOException("Failed to obtain lock for writing to the log. "
-          + "Try increasing the log write timeout value or disabling it by "
-          + "setting it to 0. "+ channelNameDescriptor);
-    }
-
     if(LOGGER.isDebugEnabled()) {
       LOGGER.debug("Rolling back " + transactionID);
     }
-
+    Rollback rollback = new Rollback(transactionID);
+    rollback.setLogWriteOrderID(WriteOrderOracle.next());
+    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback);
+    int logFileIndex = nextLogWriter(transactionID);
+    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+      roll(logFileIndex, buffer);
+    }
+    boolean error = true;
     try {
-      Rollback rollback = new Rollback(transactionID);
-      rollback.setLogWriteOrderID(WriteOrderOracle.next());
-      ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback);
-      int logFileIndex = nextLogWriter(transactionID);
-      if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-        roll(logFileIndex, buffer);
-      }
-      boolean error = true;
-      try {
-        logFiles.get(logFileIndex).rollback(buffer);
-        error = false;
-      } finally {
-        if (error) {
-          roll(logFileIndex);
-        }
-      }
+      logFiles.get(logFileIndex).rollback(buffer);
+      error = false;
     } finally {
-      if (lockAcquired) {
-        checkpointReadLock.unlock();
+      if (error) {
+        roll(logFileIndex);
       }
     }
   }
@@ -532,6 +459,36 @@ class Log {
     commit(transactionID, TransactionEventRecord.Type.TAKE.get());
   }
 
+
+  private boolean tryLockExclusive() {
+    try {
+      return checkpointWriterLock.tryLock(checkpointWriteTimeout,
+          TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      LOGGER.warn("Interrupted while waiting for log exclusive lock", ex);
+      Thread.currentThread().interrupt();
+    }
+    return false;
+  }
+  private void unlockExclusive()  {
+    checkpointWriterLock.unlock();
+  }
+
+  boolean tryLockShared() {
+    try {
+      return checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      LOGGER.warn("Interrupted while waiting for log shared lock", ex);
+      Thread.currentThread().interrupt();
+    }
+    return false;
+  }
+
+  void unlockShared()  {
+    checkpointReadLock.unlock();
+  }
+
+
   /**
    * Synchronization required since we do not want this
    * to be called during a checkpoint.
@@ -590,41 +547,20 @@ class Log {
   private void commit(long transactionID, short type) throws IOException {
 
     Preconditions.checkState(open, "Log is closed");
-
-    boolean lockAcquired = false;
+    Commit commit = new Commit(transactionID, type);
+    commit.setLogWriteOrderID(WriteOrderOracle.next());
+    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
+    int logFileIndex = nextLogWriter(transactionID);
+    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+      roll(logFileIndex, buffer);
+    }
+    boolean error = true;
     try {
-      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, 
TimeUnit.SECONDS);
-    } catch (InterruptedException ex) {
-      LOGGER.warn("Interrupted while waiting for log write lock", ex);
-      Thread.currentThread().interrupt();
-    }
-
-    if (!lockAcquired) {
-      throw new IOException("Failed to obtain lock for writing to the log. "
-          + "Try increasing the log write timeout value or disabling it by "
-          + "setting it to 0. " + channelNameDescriptor);
-    }
-
-    try {
-      Commit commit = new Commit(transactionID, type);
-      commit.setLogWriteOrderID(WriteOrderOracle.next());
-      ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
-      int logFileIndex = nextLogWriter(transactionID);
-      if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-        roll(logFileIndex, buffer);
-      }
-      boolean error = true;
-      try {
-        logFiles.get(logFileIndex).commit(buffer);
-        error = false;
-      } finally {
-        if (error) {
-          roll(logFileIndex);
-        }
-      }
+      logFiles.get(logFileIndex).commit(buffer);
+      error = false;
     } finally {
-      if (lockAcquired) {
-        checkpointReadLock.unlock();
+      if (error) {
+        roll(logFileIndex);
       }
     }
   }
@@ -661,15 +597,7 @@ class Log {
    */
   private synchronized void roll(int index, ByteBuffer buffer)
       throws IOException {
-    boolean lockAcquired = false;
-    try {
-      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, 
TimeUnit.SECONDS);
-    } catch (InterruptedException ex) {
-      LOGGER.warn("Interrupted while waiting for log write lock", ex);
-      Thread.currentThread().interrupt();
-    }
-
-    if (!lockAcquired) {
+    if (!tryLockShared()) {
       throw new IOException("Failed to obtain lock for writing to the log. "
           + "Try increasing the log write timeout value or disabling it by "
           + "setting it to 0. "+ channelNameDescriptor);
@@ -701,9 +629,7 @@ class Log {
         }
       }
     } finally {
-      if (lockAcquired) {
-        checkpointReadLock.unlock();
-      }
+      unlockShared();
     }
   }
 
@@ -722,15 +648,8 @@ class Log {
    * @throws IOException if we are unable to write the checkpoint out to disk
    */
   private boolean writeCheckpoint(boolean force) throws Exception {
-    boolean lockAcquired = false;
     boolean checkpointCompleted = false;
-    try {
-      lockAcquired = checkpointWriterLock.tryLock(this.checkpointWriteTimeout,
-          TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOGGER.warn("Interrupted while waiting to acquire lock.", e);
-      Thread.currentThread().interrupt();
-    }
+    boolean lockAcquired = tryLockExclusive();
     if(!lockAcquired) {
       return false;
     }
@@ -784,7 +703,7 @@ class Log {
         checkpointCompleted = true;
       }
     } finally {
-      checkpointWriterLock.unlock();
+      unlockExclusive();
     }
     //Do the deletes outside the checkpointWriterLock
     //Delete logic is expensive.

Reply via email to