sijie closed pull request #1406: Issue #1405: ReplicationWorker should back-off 
retrying.
URL: https://github.com/apache/bookkeeper/pull/1406
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index bdc26e767..a763ddf69 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -101,6 +101,7 @@
     protected static final String ZK_RETRY_BACKOFF_START_MS = 
"zkRetryBackoffStartMs";
     protected static final String ZK_RETRY_BACKOFF_MAX_MS = 
"zkRetryBackoffMaxMs";
     protected static final String OPEN_LEDGER_REREPLICATION_GRACE_PERIOD = 
"openLedgerRereplicationGracePeriod";
+    protected static final String LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD = 
"lockReleaseOfFailedLedgerGracePeriod";
     //ReadOnly mode support on all disk full
     protected static final String READ_ONLY_MODE_ENABLED = 
"readOnlyModeEnabled";
     //Whether the bookie is force started in ReadOnly mode
@@ -1235,6 +1236,33 @@ public long getOpenLedgerRereplicationGracePeriod() {
         return getLong(OPEN_LEDGER_REREPLICATION_GRACE_PERIOD, 30000);
     }
 
+    /**
+     * Set the grace period so that if the replication worker fails to 
replicate
+     * a underreplicatedledger for more than
+     * 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+     * number of times, then instead of releasing the lock immediately after
+     * failed attempt, it will hold under replicated ledger lock for this grace
+     * period and then it will release the lock.
+     *
+     * @param waitTime
+     */
+    public void setLockReleaseOfFailedLedgerGracePeriod(String waitTime) {
+        setProperty(LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD, waitTime);
+    }
+
+    /**
+     * Get the grace period which the replication worker to wait before
+     * releasing the lock after replication worker failing to replicate for 
more
+     * than
+     * 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+     * number of times.
+     *
+     * @return
+     */
+    public long getLockReleaseOfFailedLedgerGracePeriod() {
+        return getLong(LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD, 60000);
+    }
+
     /**
      * Get the number of bytes we should use as capacity for
      * org.apache.bookkeeper.bookie.BufferedReadChannel.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index 4f48b78f7..b1afa816d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -47,6 +47,7 @@
     String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN";
     String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN";
     String REPLICATE_EXCEPTION = "exceptions";
+    String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = 
"NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER";
 
     String BK_CLIENT_SCOPE = "bk_client";
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 276ed7120..7d51a7a39 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -20,11 +20,16 @@
 package org.apache.bookkeeper.replication;
 
 import static 
org.apache.bookkeeper.replication.ReplicationStats.BK_CLIENT_SCOPE;
+import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP;
 
 import com.google.common.base.Stopwatch;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -36,6 +41,8 @@
 import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.bookkeeper.bookie.BookieThread;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
@@ -71,6 +78,9 @@
 public class ReplicationWorker implements Runnable {
     private static final Logger LOG = LoggerFactory
             .getLogger(ReplicationWorker.class);
+    private static final int REPLICATED_FAILED_LEDGERS_MAXSIZE = 100;
+    static final int MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING = 
10;
+
     private final LedgerUnderreplicationManager underreplicationManager;
     private final ServerConfiguration conf;
     private final ZooKeeper zkc;
@@ -82,12 +92,15 @@
     private final long rwRereplicateBackoffMs;
     private final long openLedgerRereplicationGracePeriod;
     private final Timer pendingReplicationTimer;
+    private final long lockReleaseOfFailedLedgerGracePeriod;
 
     // Expose Stats
     private final StatsLogger statsLogger;
     private final OpStatsLogger rereplicateOpStats;
     private final Counter numLedgersReplicated;
+    private final Counter numDeferLedgerLockReleaseOfFailedLedger;
     private final Map<String, Counter> exceptionCounters;
+    final LoadingCache<Long, AtomicInteger> replicationFailedLedgers;
 
     /**
      * Replication worker for replicating the ledger fragments from
@@ -143,13 +156,23 @@ public ReplicationWorker(final ZooKeeper zkc,
         this.workerThread = new BookieThread(this, "ReplicationWorker");
         this.openLedgerRereplicationGracePeriod = conf
                 .getOpenLedgerRereplicationGracePeriod();
+        this.lockReleaseOfFailedLedgerGracePeriod = 
conf.getLockReleaseOfFailedLedgerGracePeriod();
         this.rwRereplicateBackoffMs = conf.getRwRereplicateBackoffMs();
         this.pendingReplicationTimer = new Timer("PendingReplicationTimer");
+        this.replicationFailedLedgers = 
CacheBuilder.newBuilder().maximumSize(REPLICATED_FAILED_LEDGERS_MAXSIZE)
+                .build(new CacheLoader<Long, AtomicInteger>() {
+                    @Override
+                    public AtomicInteger load(Long key) throws Exception {
+                        return new AtomicInteger();
+                    }
+                });
 
         // Expose Stats
         this.statsLogger = statsLogger;
         this.rereplicateOpStats = 
this.statsLogger.getOpStatsLogger(REREPLICATE_OP);
         this.numLedgersReplicated = 
this.statsLogger.getCounter(NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
+        this.numDeferLedgerLockReleaseOfFailedLedger = this.statsLogger
+                .getCounter(NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER);
         this.exceptionCounters = new HashMap<String, Counter>();
     }
 
@@ -273,6 +296,16 @@ private boolean rereplicate(long ledgerIdToReplicate) 
throws InterruptedExceptio
                 
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
                 return true;
             } else {
+                if (replicationFailedLedgers.getUnchecked(ledgerIdToReplicate)
+                        .incrementAndGet() == 
MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING) {
+                    deferLedgerLockRelease = true;
+                    LOG.error(
+                            "ReplicationWorker failed to replicate Ledger : {} 
for {} number of times, "
+                            + "so deferring the ledger lock release",
+                            ledgerIdToReplicate, 
MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
+                    deferLedgerLockReleaseOfFailedLedger(ledgerIdToReplicate);
+                    numDeferLedgerLockReleaseOfFailedLedger.inc();
+                }
                 // Releasing the underReplication ledger lock and compete
                 // for the replication again for the pending fragments
                 return false;
@@ -440,6 +473,25 @@ public void run() {
         pendingReplicationTimer.schedule(timerTask, gracePeriod);
     }
 
+    /**
+     * Schedules a timer task for releasing the lock.
+     */
+    private void deferLedgerLockReleaseOfFailedLedger(final long ledgerId) {
+        TimerTask timerTask = new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    replicationFailedLedgers.invalidate(ledgerId);
+                    
underreplicationManager.releaseUnderreplicatedLedger(ledgerId);
+                } catch (UnavailableException e) {
+                    LOG.error("UnavailableException while replicating 
fragments of ledger {}", ledgerId, e);
+                    shutdown();
+                }
+            }
+        };
+        pendingReplicationTimer.schedule(timerTask, 
lockReleaseOfFailedLedgerGracePeriod);
+    }
+
     /**
      * Stop the replication worker service.
      */
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 609e1541b..0b15f1d51 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -22,6 +22,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.net.URI;
 import java.util.ArrayList;
@@ -29,7 +30,10 @@
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import lombok.Cleanup;
+
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.ClientUtil;
@@ -449,6 +453,135 @@ public void 
testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsUR()
 
     }
 
+    /**
+     * Tests that ReplicationWorker will not make more than
+     * 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+     * number of replication failure attempts and if it fails more these many
+     * number of times then it will defer lock release by
+     * lockReleaseOfFailedLedgerGracePeriod.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testBookiesNotAvailableScenarioForReplicationWorker() throws 
Exception {
+        int ensembleSize = 3;
+        LedgerHandle lh = bkc.createLedger(ensembleSize, ensembleSize, 
BookKeeper.DigestType.CRC32, TESTPASSWD);
+
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(data);
+        }
+        lh.close();
+
+        BookieSocketAddress[] bookiesKilled = new 
BookieSocketAddress[ensembleSize];
+        ServerConfiguration[] killedBookiesConfig = new 
ServerConfiguration[ensembleSize];
+
+        // kill all bookies
+        for (int i = 0; i < ensembleSize; i++) {
+            bookiesKilled[i] = 
LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L).get(i);
+            killedBookiesConfig[i] = getBkConf(bookiesKilled[i]);
+            LOG.info("Killing Bookie", bookiesKilled[i]);
+            killBookie(bookiesKilled[i]);
+        }
+
+        // start new bookiesToKill number of bookies
+        for (int i = 0; i < ensembleSize; i++) {
+            BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();
+        }
+
+        // create couple of replicationworkers
+        baseConf.setLockReleaseOfFailedLedgerGracePeriod("500");
+        ReplicationWorker rw1 = new ReplicationWorker(zkc, baseConf);
+        ReplicationWorker rw2 = new ReplicationWorker(zkc, baseConf);
+
+        @Cleanup
+        MetadataClientDriver clientDriver = MetadataDrivers
+                
.getClientDriver(URI.create(baseClientConf.getMetadataServiceUri()));
+        clientDriver.initialize(baseClientConf, scheduler, 
NullStatsLogger.INSTANCE, Optional.empty());
+
+        LedgerManagerFactory mFactory = clientDriver.getLedgerManagerFactory();
+
+        LedgerUnderreplicationManager underReplicationManager = 
mFactory.newLedgerUnderreplicationManager();
+        try {
+            for (int i = 0; i < bookiesKilled.length; i++) {
+                underReplicationManager.markLedgerUnderreplicated(lh.getId(), 
bookiesKilled[i].toString());
+            }
+            while (!ReplicationTestUtil.isLedgerInUnderReplication(zkc, 
lh.getId(), basePath)) {
+                Thread.sleep(100);
+            }
+            rw1.start();
+            rw2.start();
+
+            AtomicBoolean isBookieRestarted = new AtomicBoolean(false);
+
+            (new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Thread.sleep(4000);
+                        isBookieRestarted.set(true);
+                        /*
+                         * after sleeping for 4000 msecs, restart one of the
+                         * bookie, so that replication can succeed.
+                         */
+                        startBookie(killedBookiesConfig[0]);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            })).start();
+
+            while (!isBookieRestarted.get()) {
+                /*
+                 * since all the bookies containing the ledger entries are down
+                 * replication wouldnt have succeeded.
+                 */
+                assertTrue("Ledger: " + lh.getId() + " should be 
underreplicated",
+                        ReplicationTestUtil.isLedgerInUnderReplication(zkc, 
lh.getId(), basePath));
+                /*
+                 * check for both the replicationworkders number of failed
+                 * attempts should be less than ReplicationWorker.
+                 * MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+                 */
+                int failedAttempts = 
rw1.replicationFailedLedgers.get(lh.getId()).get();
+                assertTrue(
+                        "The number of failed attempts should be less than "
+                                + 
"ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING, "
+                                + "but it is "
+                                + failedAttempts,
+                        failedAttempts <= 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
+
+                failedAttempts = 
rw2.replicationFailedLedgers.get(lh.getId()).get();
+                assertTrue(
+                        "The number of failed attempts should be less than "
+                                + 
"ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING, "
+                                + "but it is "
+                                + failedAttempts,
+                        failedAttempts <= 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
+
+                Thread.sleep(50);
+            }
+
+            /**
+             * since one of the killed bookie is restarted, replicationworker
+             * should succeed in replicating this under replicated ledger and 
it
+             * shouldn't be under replicated anymore.
+             */
+            int timeToWaitForReplicationToComplete = 2000;
+            int timeWaited = 0;
+            while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, 
lh.getId(), basePath)) {
+                Thread.sleep(100);
+                timeWaited += 100;
+                if (timeWaited == timeToWaitForReplicationToComplete) {
+                    fail("Ledger should be replicated by now");
+                }
+            }
+        } finally {
+            rw1.shutdown();
+            rw2.shutdown();
+            underReplicationManager.close();
+        }
+    }
+
     /**
      * Tests that ReplicationWorker should not have identified for postponing
      * the replication if ledger is in open state and lastFragment is not in
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 481d82826..f366c604e 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -626,6 +626,9 @@ groups:
   - param: openLedgerRereplicationGracePeriod
     description: The grace period, in seconds, that the replication worker 
waits before fencing and replicating a ledger fragment that's still being 
written to upon bookie failure.
     default: 30
+  - param: lockReleaseOfFailedLedgerGracePeriod
+    description: the grace period so that if the replication worker fails to 
replicate a underreplicatedledger for more than 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING 
number of times, then instead of releasing the lock immediately after failed 
attempt, it will hold under replicated ledger lock for this grace period and 
then it will release the lock.
+    default: 60
   - param: rwRereplicateBackoffMs
     description: The time to backoff when replication worker encounters 
exceptions on replicating a ledger, in milliseconds.
     default: 5000


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to