Repository: hadoop
Updated Branches:
  refs/heads/trunk 96892c469 -> 8e54da151


HDFS-13051. Fix dead lock during async editlog rolling if edit queue is full. 
Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e54da15
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e54da15
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e54da15

Branch: refs/heads/trunk
Commit: 8e54da1511e78477c1d4655d5ff0a69d0330869f
Parents: 96892c4
Author: Xiao Chen <x...@apache.org>
Authored: Mon Sep 10 22:14:02 2018 -0700
Committer: Xiao Chen <x...@apache.org>
Committed: Mon Sep 10 22:34:23 2018 -0700

----------------------------------------------------------------------
 .../hdfs/server/namenode/FSEditLogAsync.java    |  61 ++++++-
 .../hdfs/server/namenode/TestEditLogRace.java   | 158 ++++++++++++++++++-
 2 files changed, 215 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e54da15/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
index 7f39379..2b47398 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
@@ -24,7 +24,9 @@ import java.util.Deque;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -145,15 +147,68 @@ class FSEditLogAsync extends FSEditLog implements 
Runnable {
     edit.logSyncWait();
   }
 
+  // draining permits is intended to provide a high priority reservation.
+  // however, release of outstanding permits must be postponed until
+  // drained permits are restored to avoid starvation.  logic has some races
+  // but is good enough to serve its purpose.
+  private Semaphore overflowMutex = new Semaphore(8){
+    private AtomicBoolean draining = new AtomicBoolean();
+    private AtomicInteger pendingReleases = new AtomicInteger();
+    @Override
+    public int drainPermits() {
+      draining.set(true);
+      return super.drainPermits();
+    }
+    // while draining, count the releases until release(int)
+    private void tryRelease(int permits) {
+      pendingReleases.getAndAdd(permits);
+      if (!draining.get()) {
+        super.release(pendingReleases.getAndSet(0));
+      }
+    }
+    @Override
+    public void release() {
+      tryRelease(1);
+    }
+    @Override
+    public void release(int permits) {
+      draining.set(false);
+      tryRelease(permits);
+    }
+  };
+
   private void enqueueEdit(Edit edit) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("logEdit " + edit);
     }
     try {
-      if (!editPendingQ.offer(edit, 1, TimeUnit.SECONDS)) {
+      // not checking for overflow yet to avoid penalizing performance of
+      // the common case.  if there is persistent overflow, a mutex will be
+      // use to throttle contention on the queue.
+      if (!editPendingQ.offer(edit)) {
         Preconditions.checkState(
             isSyncThreadAlive(), "sync thread is not alive");
-        editPendingQ.put(edit);
+        if (Thread.holdsLock(this)) {
+          // if queue is full, synchronized caller must immediately relinquish
+          // the monitor before re-offering to avoid deadlock with sync thread
+          // which needs the monitor to write transactions.
+          int permits = overflowMutex.drainPermits();
+          try {
+            do {
+              this.wait(1000); // will be notified by next logSync.
+            } while (!editPendingQ.offer(edit));
+          } finally {
+            overflowMutex.release(permits);
+          }
+        } else {
+          // mutex will throttle contention during persistent overflow.
+          overflowMutex.acquire();
+          try {
+            editPendingQ.put(edit);
+          } finally {
+            overflowMutex.release();
+          }
+        }
       }
     } catch (Throwable t) {
       // should never happen!  failure to enqueue an edit is fatal

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e54da15/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
index 2858e4e..a083140 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
@@ -28,10 +28,17 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.base.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -52,6 +59,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
+import org.mockito.Mockito;
 import org.slf4j.event.Level;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -566,5 +574,153 @@ public class TestEditLogRace {
       LOG.info("Closing nn");
       if(namesystem != null) namesystem.close();
     }
-  }  
+  }
+
+  @Test(timeout=180000)
+  public void testDeadlock() throws Throwable {
+    GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO);
+
+    Configuration conf = getConf();
+    NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
+    DFSTestUtil.formatNameNode(conf);
+    final FSNamesystem namesystem = FSNamesystem.loadFromDisk(conf);
+
+    final AtomicBoolean done = new AtomicBoolean(false);
+    final Semaphore blockerSemaphore = new Semaphore(0);
+    final CountDownLatch startSpamLatch = new CountDownLatch(1);
+
+    ExecutorService executor = Executors.newCachedThreadPool();
+    try {
+      final FSEditLog editLog = namesystem.getEditLog();
+
+      FSEditLogOp.OpInstanceCache cache = editLog.cache.get();
+      final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache)
+        .setSource("/").setUser("u").setGroup("g");
+      // don't reset fields so instance can be reused.
+      final FSEditLogOp reuseOp = Mockito.spy(op);
+      Mockito.doNothing().when(reuseOp).reset();
+
+      // only job is spam edits.  it will fill the queue when the test
+      // loop injects the blockingOp.
+      Future[] logSpammers = new Future[16];
+      for (int i=0; i < logSpammers.length; i++) {
+        final int ii = i;
+        logSpammers[i] = executor.submit(new Callable() {
+          @Override
+          public Void call() throws Exception {
+            Thread.currentThread().setName("Log spammer " + ii);
+            // wait until a blocking edit op notifies us to go.
+            startSpamLatch.await();
+            for (int i = 0; !done.get() && i < 1000000; i++) {
+              // do not logSync here because we need to congest the queue.
+              editLog.logEdit(reuseOp);
+              if (i % 2048 == 0) {
+                LOG.info("thread[" + ii +"] edits=" + i);
+              }
+            }
+            assertTrue("too many edits", done.get());
+            return null;
+          }
+        });
+      }
+
+      // the tx id is set while the edit log monitor is held, so this will
+      // effectively stall the async processing thread which will cause the
+      // edit queue to fill up.
+      final FSEditLogOp blockingOp = Mockito.spy(op);
+      doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            // flip the latch to unleash the spamming threads to congest
+            // the queue.
+            startSpamLatch.countDown();
+            // wait until unblocked after a synchronized thread is started.
+            blockerSemaphore.acquire();
+            invocation.callRealMethod();
+            return null;
+          }
+        }
+      ).when(blockingOp).setTransactionId(Mockito.anyLong());
+      // don't reset fields so instance can be reused.
+      Mockito.doNothing().when(blockingOp).reset();
+
+      // repeatedly overflow the queue and verify it doesn't deadlock.
+      for (int i = 0; i < 8; i++) {
+        // when the blockingOp is logged, it triggers the latch to unleash the
+        // spammers to overflow the edit queue, then waits for a permit
+        // from blockerSemaphore that will be released at the bottom of
+        // this loop.
+        Future blockingEdit = executor.submit(new Callable() {
+          @Override
+          public Void call() throws Exception {
+            Thread.currentThread().setName("Log blocker");
+            editLog.logEdit(blockingOp);
+            editLog.logSync();
+            return null;
+          }
+        });
+
+        // wait for spammers to seize up the edit log.
+        long startTxId = editLog.getLastWrittenTxIdWithoutLock();
+        final long[] txIds = { startTxId, startTxId, startTxId };
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            txIds[0] = txIds[1];
+            txIds[1] = txIds[2];
+            txIds[2] = editLog.getLastWrittenTxIdWithoutLock();
+            return (txIds[0] == txIds[1] &&
+                    txIds[1] == txIds[2] &&
+                    txIds[2] > startTxId);
+          }
+        }, 100, 10000);
+
+        // callers that synchronize on the edit log while the queue is full
+        // are prone to deadlock if the locking is incorrect.  at this point:
+        // 1. the blocking edit is holding the log's monitor.
+        // 2. the spammers have filled the queue.
+        // 3. the spammers are blocked waiting to queue another edit.
+        // Now we'll start another thread to synchronize on the log (simulates
+        // what log rolling does), unblock the op currently holding the
+        // monitor, and ensure deadlock does not occur.
+        CountDownLatch readyLatch = new CountDownLatch(1);
+        Future synchedEdits = executor.submit(new Callable() {
+          @Override
+          public Void call() throws Exception {
+            Thread.currentThread().setName("Log synchronizer");
+            // the sync is CRUCIAL for this test.  it's what causes edit
+            // log rolling to deadlock when queue is full.
+            readyLatch.countDown();
+            synchronized (editLog) {
+              editLog.logEdit(reuseOp);
+              editLog.logSync();
+            }
+            return null;
+          }
+        });
+        // unblock the edit jammed in setting its txid.  queued edits should
+        // start flowing and the synced edits should complete.
+        readyLatch.await();
+        blockerSemaphore.release();
+        blockingEdit.get();
+        synchedEdits.get();
+      }
+
+      // tell spammers to stop.
+      done.set(true);
+      for (int i=0; i < logSpammers.length; i++) {
+        logSpammers[i].get();
+      }
+      // just make sure everything can be synced.
+      editLog.logSyncAll();
+    } finally {
+      LOG.info("Closing nn");
+      executor.shutdownNow();
+      if (namesystem != null) {
+        namesystem.getFSImage().getStorage().close();
+        namesystem.close();
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to