This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c077a9  Issue #1405: ReplicationWorker should back-off retrying.
8c077a9 is described below

commit 8c077a93e50ea844e0c39ca042ed2aa8cbd6d256
Author: cguttapalem <cguttapa...@salesforce.com>
AuthorDate: Thu May 17 01:38:20 2018 -0700

    Issue #1405: ReplicationWorker should back-off retrying.
    
    Descriptions of the changes in this PR:
    
    ReplicationWorker should backoff replication
    after threshold number of replication failures of a ledger.
    
    Currently ReplicationWorker will do busy retrials if
    replication is failed for a ledger, instead it should
    backoff if replication had failed for threshold
    number of times. This can be done by deferring
    releasing of underreplicated lock by
    'lockReleaseOfFailedLedgerGracePeriod' amount
    of time.
    
    Master Issue: #1405
    
    Author: cguttapalem <cguttapa...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eolive...@gmail.com>, Jia Zhai <None>, Sijie 
Guo <si...@apache.org>
    
    This closes #1406 from reddycharan/fixreplicator, closes #1405
---
 .../bookkeeper/conf/ServerConfiguration.java       |  28 +++++
 .../bookkeeper/replication/ReplicationStats.java   |   1 +
 .../bookkeeper/replication/ReplicationWorker.java  |  52 ++++++++
 .../replication/TestReplicationWorker.java         | 133 +++++++++++++++++++++
 site/_data/config/bk_server.yaml                   |   3 +
 5 files changed, 217 insertions(+)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index bdc26e7..a763ddf 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -101,6 +101,7 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     protected static final String ZK_RETRY_BACKOFF_START_MS = 
"zkRetryBackoffStartMs";
     protected static final String ZK_RETRY_BACKOFF_MAX_MS = 
"zkRetryBackoffMaxMs";
     protected static final String OPEN_LEDGER_REREPLICATION_GRACE_PERIOD = 
"openLedgerRereplicationGracePeriod";
+    protected static final String LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD = 
"lockReleaseOfFailedLedgerGracePeriod";
     //ReadOnly mode support on all disk full
     protected static final String READ_ONLY_MODE_ENABLED = 
"readOnlyModeEnabled";
     //Whether the bookie is force started in ReadOnly mode
@@ -1236,6 +1237,33 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     }
 
     /**
+     * Set the grace period so that if the replication worker fails to 
replicate
+     * a underreplicatedledger for more than
+     * 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+     * number of times, then instead of releasing the lock immediately after
+     * failed attempt, it will hold under replicated ledger lock for this grace
+     * period and then it will release the lock.
+     *
+     * @param waitTime
+     */
+    public void setLockReleaseOfFailedLedgerGracePeriod(String waitTime) {
+        setProperty(LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD, waitTime);
+    }
+
+    /**
+     * Get the grace period which the replication worker to wait before
+     * releasing the lock after replication worker failing to replicate for 
more
+     * than
+     * 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+     * number of times.
+     *
+     * @return
+     */
+    public long getLockReleaseOfFailedLedgerGracePeriod() {
+        return getLong(LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD, 60000);
+    }
+
+    /**
      * Get the number of bytes we should use as capacity for
      * org.apache.bookkeeper.bookie.BufferedReadChannel.
      * Default is 512 bytes
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index 4f48b78..b1afa81 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -47,6 +47,7 @@ public interface ReplicationStats {
     String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN";
     String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN";
     String REPLICATE_EXCEPTION = "exceptions";
+    String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = 
"NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER";
 
     String BK_CLIENT_SCOPE = "bk_client";
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 276ed71..7d51a7a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -20,11 +20,16 @@
 package org.apache.bookkeeper.replication;
 
 import static 
org.apache.bookkeeper.replication.ReplicationStats.BK_CLIENT_SCOPE;
+import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP;
 
 import com.google.common.base.Stopwatch;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -36,6 +41,8 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.bookkeeper.bookie.BookieThread;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
@@ -71,6 +78,9 @@ import org.slf4j.LoggerFactory;
 public class ReplicationWorker implements Runnable {
     private static final Logger LOG = LoggerFactory
             .getLogger(ReplicationWorker.class);
+    private static final int REPLICATED_FAILED_LEDGERS_MAXSIZE = 100;
+    static final int MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING = 
10;
+
     private final LedgerUnderreplicationManager underreplicationManager;
     private final ServerConfiguration conf;
     private final ZooKeeper zkc;
@@ -82,12 +92,15 @@ public class ReplicationWorker implements Runnable {
     private final long rwRereplicateBackoffMs;
     private final long openLedgerRereplicationGracePeriod;
     private final Timer pendingReplicationTimer;
+    private final long lockReleaseOfFailedLedgerGracePeriod;
 
     // Expose Stats
     private final StatsLogger statsLogger;
     private final OpStatsLogger rereplicateOpStats;
     private final Counter numLedgersReplicated;
+    private final Counter numDeferLedgerLockReleaseOfFailedLedger;
     private final Map<String, Counter> exceptionCounters;
+    final LoadingCache<Long, AtomicInteger> replicationFailedLedgers;
 
     /**
      * Replication worker for replicating the ledger fragments from
@@ -143,13 +156,23 @@ public class ReplicationWorker implements Runnable {
         this.workerThread = new BookieThread(this, "ReplicationWorker");
         this.openLedgerRereplicationGracePeriod = conf
                 .getOpenLedgerRereplicationGracePeriod();
+        this.lockReleaseOfFailedLedgerGracePeriod = 
conf.getLockReleaseOfFailedLedgerGracePeriod();
         this.rwRereplicateBackoffMs = conf.getRwRereplicateBackoffMs();
         this.pendingReplicationTimer = new Timer("PendingReplicationTimer");
+        this.replicationFailedLedgers = 
CacheBuilder.newBuilder().maximumSize(REPLICATED_FAILED_LEDGERS_MAXSIZE)
+                .build(new CacheLoader<Long, AtomicInteger>() {
+                    @Override
+                    public AtomicInteger load(Long key) throws Exception {
+                        return new AtomicInteger();
+                    }
+                });
 
         // Expose Stats
         this.statsLogger = statsLogger;
         this.rereplicateOpStats = 
this.statsLogger.getOpStatsLogger(REREPLICATE_OP);
         this.numLedgersReplicated = 
this.statsLogger.getCounter(NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
+        this.numDeferLedgerLockReleaseOfFailedLedger = this.statsLogger
+                .getCounter(NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER);
         this.exceptionCounters = new HashMap<String, Counter>();
     }
 
@@ -273,6 +296,16 @@ public class ReplicationWorker implements Runnable {
                 
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
                 return true;
             } else {
+                if (replicationFailedLedgers.getUnchecked(ledgerIdToReplicate)
+                        .incrementAndGet() == 
MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING) {
+                    deferLedgerLockRelease = true;
+                    LOG.error(
+                            "ReplicationWorker failed to replicate Ledger : {} 
for {} number of times, "
+                            + "so deferring the ledger lock release",
+                            ledgerIdToReplicate, 
MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
+                    deferLedgerLockReleaseOfFailedLedger(ledgerIdToReplicate);
+                    numDeferLedgerLockReleaseOfFailedLedger.inc();
+                }
                 // Releasing the underReplication ledger lock and compete
                 // for the replication again for the pending fragments
                 return false;
@@ -441,6 +474,25 @@ public class ReplicationWorker implements Runnable {
     }
 
     /**
+     * Schedules a timer task for releasing the lock.
+     */
+    private void deferLedgerLockReleaseOfFailedLedger(final long ledgerId) {
+        TimerTask timerTask = new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    replicationFailedLedgers.invalidate(ledgerId);
+                    
underreplicationManager.releaseUnderreplicatedLedger(ledgerId);
+                } catch (UnavailableException e) {
+                    LOG.error("UnavailableException while replicating 
fragments of ledger {}", ledgerId, e);
+                    shutdown();
+                }
+            }
+        };
+        pendingReplicationTimer.schedule(timerTask, 
lockReleaseOfFailedLedgerGracePeriod);
+    }
+
+    /**
      * Stop the replication worker service.
      */
     public void shutdown() {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 609e154..0b15f1d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.replication;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.net.URI;
 import java.util.ArrayList;
@@ -29,7 +30,10 @@ import java.util.Enumeration;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import lombok.Cleanup;
+
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.ClientUtil;
@@ -450,6 +454,135 @@ public class TestReplicationWorker extends 
BookKeeperClusterTestCase {
     }
 
     /**
+     * Tests that ReplicationWorker will not make more than
+     * 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+     * number of replication failure attempts and if it fails more these many
+     * number of times then it will defer lock release by
+     * lockReleaseOfFailedLedgerGracePeriod.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testBookiesNotAvailableScenarioForReplicationWorker() throws 
Exception {
+        int ensembleSize = 3;
+        LedgerHandle lh = bkc.createLedger(ensembleSize, ensembleSize, 
BookKeeper.DigestType.CRC32, TESTPASSWD);
+
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(data);
+        }
+        lh.close();
+
+        BookieSocketAddress[] bookiesKilled = new 
BookieSocketAddress[ensembleSize];
+        ServerConfiguration[] killedBookiesConfig = new 
ServerConfiguration[ensembleSize];
+
+        // kill all bookies
+        for (int i = 0; i < ensembleSize; i++) {
+            bookiesKilled[i] = 
LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L).get(i);
+            killedBookiesConfig[i] = getBkConf(bookiesKilled[i]);
+            LOG.info("Killing Bookie", bookiesKilled[i]);
+            killBookie(bookiesKilled[i]);
+        }
+
+        // start new bookiesToKill number of bookies
+        for (int i = 0; i < ensembleSize; i++) {
+            BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();
+        }
+
+        // create couple of replicationworkers
+        baseConf.setLockReleaseOfFailedLedgerGracePeriod("500");
+        ReplicationWorker rw1 = new ReplicationWorker(zkc, baseConf);
+        ReplicationWorker rw2 = new ReplicationWorker(zkc, baseConf);
+
+        @Cleanup
+        MetadataClientDriver clientDriver = MetadataDrivers
+                
.getClientDriver(URI.create(baseClientConf.getMetadataServiceUri()));
+        clientDriver.initialize(baseClientConf, scheduler, 
NullStatsLogger.INSTANCE, Optional.empty());
+
+        LedgerManagerFactory mFactory = clientDriver.getLedgerManagerFactory();
+
+        LedgerUnderreplicationManager underReplicationManager = 
mFactory.newLedgerUnderreplicationManager();
+        try {
+            for (int i = 0; i < bookiesKilled.length; i++) {
+                underReplicationManager.markLedgerUnderreplicated(lh.getId(), 
bookiesKilled[i].toString());
+            }
+            while (!ReplicationTestUtil.isLedgerInUnderReplication(zkc, 
lh.getId(), basePath)) {
+                Thread.sleep(100);
+            }
+            rw1.start();
+            rw2.start();
+
+            AtomicBoolean isBookieRestarted = new AtomicBoolean(false);
+
+            (new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Thread.sleep(4000);
+                        isBookieRestarted.set(true);
+                        /*
+                         * after sleeping for 4000 msecs, restart one of the
+                         * bookie, so that replication can succeed.
+                         */
+                        startBookie(killedBookiesConfig[0]);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            })).start();
+
+            while (!isBookieRestarted.get()) {
+                /*
+                 * since all the bookies containing the ledger entries are down
+                 * replication wouldnt have succeeded.
+                 */
+                assertTrue("Ledger: " + lh.getId() + " should be 
underreplicated",
+                        ReplicationTestUtil.isLedgerInUnderReplication(zkc, 
lh.getId(), basePath));
+                /*
+                 * check for both the replicationworkders number of failed
+                 * attempts should be less than ReplicationWorker.
+                 * MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+                 */
+                int failedAttempts = 
rw1.replicationFailedLedgers.get(lh.getId()).get();
+                assertTrue(
+                        "The number of failed attempts should be less than "
+                                + 
"ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING, "
+                                + "but it is "
+                                + failedAttempts,
+                        failedAttempts <= 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
+
+                failedAttempts = 
rw2.replicationFailedLedgers.get(lh.getId()).get();
+                assertTrue(
+                        "The number of failed attempts should be less than "
+                                + 
"ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING, "
+                                + "but it is "
+                                + failedAttempts,
+                        failedAttempts <= 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
+
+                Thread.sleep(50);
+            }
+
+            /**
+             * since one of the killed bookie is restarted, replicationworker
+             * should succeed in replicating this under replicated ledger and 
it
+             * shouldn't be under replicated anymore.
+             */
+            int timeToWaitForReplicationToComplete = 2000;
+            int timeWaited = 0;
+            while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, 
lh.getId(), basePath)) {
+                Thread.sleep(100);
+                timeWaited += 100;
+                if (timeWaited == timeToWaitForReplicationToComplete) {
+                    fail("Ledger should be replicated by now");
+                }
+            }
+        } finally {
+            rw1.shutdown();
+            rw2.shutdown();
+            underReplicationManager.close();
+        }
+    }
+
+    /**
      * Tests that ReplicationWorker should not have identified for postponing
      * the replication if ledger is in open state and lastFragment is not in
      * underReplication state. Note that RW should not fence such ledgers.
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 481d828..f366c60 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -626,6 +626,9 @@ groups:
   - param: openLedgerRereplicationGracePeriod
     description: The grace period, in seconds, that the replication worker 
waits before fencing and replicating a ledger fragment that's still being 
written to upon bookie failure.
     default: 30
+  - param: lockReleaseOfFailedLedgerGracePeriod
+    description: the grace period so that if the replication worker fails to 
replicate a underreplicatedledger for more than 
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING 
number of times, then instead of releasing the lock immediately after failed 
attempt, it will hold under replicated ledger lock for this grace period and 
then it will release the lock.
+    default: 60
   - param: rwRereplicateBackoffMs
     description: The time to backoff when replication worker encounters 
exceptions on replicating a ledger, in milliseconds.
     default: 5000

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to