This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 8c077a9 Issue #1405: ReplicationWorker should back-off retrying.
8c077a9 is described below
commit 8c077a93e50ea844e0c39ca042ed2aa8cbd6d256
Author: cguttapalem <[email protected]>
AuthorDate: Thu May 17 01:38:20 2018 -0700
Issue #1405: ReplicationWorker should back-off retrying.
Descriptions of the changes in this PR:
ReplicationWorker should backoff replication
after threshold number of replication failures of a ledger.
Currently ReplicationWorker will do busy retrials if
replication is failed for a ledger, instead it should
backoff if replication had failed for threshold
number of times. This can be done by deferring
releasing of underreplicated lock by
'lockReleaseOfFailedLedgerGracePeriod' amount
of time.
Master Issue: #1405
Author: cguttapalem <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie
Guo <[email protected]>
This closes #1406 from reddycharan/fixreplicator, closes #1405
---
.../bookkeeper/conf/ServerConfiguration.java | 28 +++++
.../bookkeeper/replication/ReplicationStats.java | 1 +
.../bookkeeper/replication/ReplicationWorker.java | 52 ++++++++
.../replication/TestReplicationWorker.java | 133 +++++++++++++++++++++
site/_data/config/bk_server.yaml | 3 +
5 files changed, 217 insertions(+)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index bdc26e7..a763ddf 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -101,6 +101,7 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
protected static final String ZK_RETRY_BACKOFF_START_MS =
"zkRetryBackoffStartMs";
protected static final String ZK_RETRY_BACKOFF_MAX_MS =
"zkRetryBackoffMaxMs";
protected static final String OPEN_LEDGER_REREPLICATION_GRACE_PERIOD =
"openLedgerRereplicationGracePeriod";
+ protected static final String LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD =
"lockReleaseOfFailedLedgerGracePeriod";
//ReadOnly mode support on all disk full
protected static final String READ_ONLY_MODE_ENABLED =
"readOnlyModeEnabled";
//Whether the bookie is force started in ReadOnly mode
@@ -1236,6 +1237,33 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
}
/**
+ * Set the grace period so that if the replication worker fails to
replicate
+ * a underreplicatedledger for more than
+ *
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+ * number of times, then instead of releasing the lock immediately after
+ * failed attempt, it will hold under replicated ledger lock for this grace
+ * period and then it will release the lock.
+ *
+ * @param waitTime
+ */
+ public void setLockReleaseOfFailedLedgerGracePeriod(String waitTime) {
+ setProperty(LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD, waitTime);
+ }
+
+ /**
+ * Get the grace period which the replication worker to wait before
+ * releasing the lock after replication worker failing to replicate for
more
+ * than
+ *
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+ * number of times.
+ *
+ * @return
+ */
+ public long getLockReleaseOfFailedLedgerGracePeriod() {
+ return getLong(LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD, 60000);
+ }
+
+ /**
* Get the number of bytes we should use as capacity for
* org.apache.bookkeeper.bookie.BufferedReadChannel.
* Default is 512 bytes
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index 4f48b78..b1afa81 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -47,6 +47,7 @@ public interface ReplicationStats {
String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN";
String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN";
String REPLICATE_EXCEPTION = "exceptions";
+ String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER =
"NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER";
String BK_CLIENT_SCOPE = "bk_client";
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 276ed71..7d51a7a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -20,11 +20,16 @@
package org.apache.bookkeeper.replication;
import static
org.apache.bookkeeper.replication.ReplicationStats.BK_CLIENT_SCOPE;
+import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER;
import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED;
import static
org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION;
import static
org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP;
import com.google.common.base.Stopwatch;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -36,6 +41,8 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.bookkeeper.bookie.BookieThread;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
@@ -71,6 +78,9 @@ import org.slf4j.LoggerFactory;
public class ReplicationWorker implements Runnable {
private static final Logger LOG = LoggerFactory
.getLogger(ReplicationWorker.class);
+ private static final int REPLICATED_FAILED_LEDGERS_MAXSIZE = 100;
+ static final int MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING =
10;
+
private final LedgerUnderreplicationManager underreplicationManager;
private final ServerConfiguration conf;
private final ZooKeeper zkc;
@@ -82,12 +92,15 @@ public class ReplicationWorker implements Runnable {
private final long rwRereplicateBackoffMs;
private final long openLedgerRereplicationGracePeriod;
private final Timer pendingReplicationTimer;
+ private final long lockReleaseOfFailedLedgerGracePeriod;
// Expose Stats
private final StatsLogger statsLogger;
private final OpStatsLogger rereplicateOpStats;
private final Counter numLedgersReplicated;
+ private final Counter numDeferLedgerLockReleaseOfFailedLedger;
private final Map<String, Counter> exceptionCounters;
+ final LoadingCache<Long, AtomicInteger> replicationFailedLedgers;
/**
* Replication worker for replicating the ledger fragments from
@@ -143,13 +156,23 @@ public class ReplicationWorker implements Runnable {
this.workerThread = new BookieThread(this, "ReplicationWorker");
this.openLedgerRereplicationGracePeriod = conf
.getOpenLedgerRereplicationGracePeriod();
+ this.lockReleaseOfFailedLedgerGracePeriod =
conf.getLockReleaseOfFailedLedgerGracePeriod();
this.rwRereplicateBackoffMs = conf.getRwRereplicateBackoffMs();
this.pendingReplicationTimer = new Timer("PendingReplicationTimer");
+ this.replicationFailedLedgers =
CacheBuilder.newBuilder().maximumSize(REPLICATED_FAILED_LEDGERS_MAXSIZE)
+ .build(new CacheLoader<Long, AtomicInteger>() {
+ @Override
+ public AtomicInteger load(Long key) throws Exception {
+ return new AtomicInteger();
+ }
+ });
// Expose Stats
this.statsLogger = statsLogger;
this.rereplicateOpStats =
this.statsLogger.getOpStatsLogger(REREPLICATE_OP);
this.numLedgersReplicated =
this.statsLogger.getCounter(NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
+ this.numDeferLedgerLockReleaseOfFailedLedger = this.statsLogger
+ .getCounter(NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER);
this.exceptionCounters = new HashMap<String, Counter>();
}
@@ -273,6 +296,16 @@ public class ReplicationWorker implements Runnable {
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
return true;
} else {
+ if (replicationFailedLedgers.getUnchecked(ledgerIdToReplicate)
+ .incrementAndGet() ==
MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING) {
+ deferLedgerLockRelease = true;
+ LOG.error(
+ "ReplicationWorker failed to replicate Ledger : {}
for {} number of times, "
+ + "so deferring the ledger lock release",
+ ledgerIdToReplicate,
MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
+ deferLedgerLockReleaseOfFailedLedger(ledgerIdToReplicate);
+ numDeferLedgerLockReleaseOfFailedLedger.inc();
+ }
// Releasing the underReplication ledger lock and compete
// for the replication again for the pending fragments
return false;
@@ -441,6 +474,25 @@ public class ReplicationWorker implements Runnable {
}
/**
+ * Schedules a timer task for releasing the lock.
+ */
+ private void deferLedgerLockReleaseOfFailedLedger(final long ledgerId) {
+ TimerTask timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ replicationFailedLedgers.invalidate(ledgerId);
+
underreplicationManager.releaseUnderreplicatedLedger(ledgerId);
+ } catch (UnavailableException e) {
+ LOG.error("UnavailableException while replicating
fragments of ledger {}", ledgerId, e);
+ shutdown();
+ }
+ }
+ };
+ pendingReplicationTimer.schedule(timerTask,
lockReleaseOfFailedLedgerGracePeriod);
+ }
+
+ /**
* Stop the replication worker service.
*/
public void shutdown() {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 609e154..0b15f1d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.net.URI;
import java.util.ArrayList;
@@ -29,7 +30,10 @@ import java.util.Enumeration;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import lombok.Cleanup;
+
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.ClientUtil;
@@ -450,6 +454,135 @@ public class TestReplicationWorker extends
BookKeeperClusterTestCase {
}
/**
+ * Tests that ReplicationWorker will not make more than
+ *
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+ * number of replication failure attempts and if it fails more these many
+ * number of times then it will defer lock release by
+ * lockReleaseOfFailedLedgerGracePeriod.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBookiesNotAvailableScenarioForReplicationWorker() throws
Exception {
+ int ensembleSize = 3;
+ LedgerHandle lh = bkc.createLedger(ensembleSize, ensembleSize,
BookKeeper.DigestType.CRC32, TESTPASSWD);
+
+ for (int i = 0; i < 10; i++) {
+ lh.addEntry(data);
+ }
+ lh.close();
+
+ BookieSocketAddress[] bookiesKilled = new
BookieSocketAddress[ensembleSize];
+ ServerConfiguration[] killedBookiesConfig = new
ServerConfiguration[ensembleSize];
+
+ // kill all bookies
+ for (int i = 0; i < ensembleSize; i++) {
+ bookiesKilled[i] =
LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L).get(i);
+ killedBookiesConfig[i] = getBkConf(bookiesKilled[i]);
+ LOG.info("Killing Bookie", bookiesKilled[i]);
+ killBookie(bookiesKilled[i]);
+ }
+
+ // start new bookiesToKill number of bookies
+ for (int i = 0; i < ensembleSize; i++) {
+ BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();
+ }
+
+ // create couple of replicationworkers
+ baseConf.setLockReleaseOfFailedLedgerGracePeriod("500");
+ ReplicationWorker rw1 = new ReplicationWorker(zkc, baseConf);
+ ReplicationWorker rw2 = new ReplicationWorker(zkc, baseConf);
+
+ @Cleanup
+ MetadataClientDriver clientDriver = MetadataDrivers
+
.getClientDriver(URI.create(baseClientConf.getMetadataServiceUri()));
+ clientDriver.initialize(baseClientConf, scheduler,
NullStatsLogger.INSTANCE, Optional.empty());
+
+ LedgerManagerFactory mFactory = clientDriver.getLedgerManagerFactory();
+
+ LedgerUnderreplicationManager underReplicationManager =
mFactory.newLedgerUnderreplicationManager();
+ try {
+ for (int i = 0; i < bookiesKilled.length; i++) {
+ underReplicationManager.markLedgerUnderreplicated(lh.getId(),
bookiesKilled[i].toString());
+ }
+ while (!ReplicationTestUtil.isLedgerInUnderReplication(zkc,
lh.getId(), basePath)) {
+ Thread.sleep(100);
+ }
+ rw1.start();
+ rw2.start();
+
+ AtomicBoolean isBookieRestarted = new AtomicBoolean(false);
+
+ (new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(4000);
+ isBookieRestarted.set(true);
+ /*
+ * after sleeping for 4000 msecs, restart one of the
+ * bookie, so that replication can succeed.
+ */
+ startBookie(killedBookiesConfig[0]);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ })).start();
+
+ while (!isBookieRestarted.get()) {
+ /*
+ * since all the bookies containing the ledger entries are down
+ * replication wouldnt have succeeded.
+ */
+ assertTrue("Ledger: " + lh.getId() + " should be
underreplicated",
+ ReplicationTestUtil.isLedgerInUnderReplication(zkc,
lh.getId(), basePath));
+ /*
+ * check for both the replicationworkders number of failed
+ * attempts should be less than ReplicationWorker.
+ * MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
+ */
+ int failedAttempts =
rw1.replicationFailedLedgers.get(lh.getId()).get();
+ assertTrue(
+ "The number of failed attempts should be less than "
+ +
"ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING, "
+ + "but it is "
+ + failedAttempts,
+ failedAttempts <=
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
+
+ failedAttempts =
rw2.replicationFailedLedgers.get(lh.getId()).get();
+ assertTrue(
+ "The number of failed attempts should be less than "
+ +
"ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING, "
+ + "but it is "
+ + failedAttempts,
+ failedAttempts <=
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
+
+ Thread.sleep(50);
+ }
+
+ /**
+ * since one of the killed bookie is restarted, replicationworker
+ * should succeed in replicating this under replicated ledger and
it
+ * shouldn't be under replicated anymore.
+ */
+ int timeToWaitForReplicationToComplete = 2000;
+ int timeWaited = 0;
+ while (ReplicationTestUtil.isLedgerInUnderReplication(zkc,
lh.getId(), basePath)) {
+ Thread.sleep(100);
+ timeWaited += 100;
+ if (timeWaited == timeToWaitForReplicationToComplete) {
+ fail("Ledger should be replicated by now");
+ }
+ }
+ } finally {
+ rw1.shutdown();
+ rw2.shutdown();
+ underReplicationManager.close();
+ }
+ }
+
+ /**
* Tests that ReplicationWorker should not have identified for postponing
* the replication if ledger is in open state and lastFragment is not in
* underReplication state. Note that RW should not fence such ledgers.
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 481d828..f366c60 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -626,6 +626,9 @@ groups:
- param: openLedgerRereplicationGracePeriod
description: The grace period, in seconds, that the replication worker
waits before fencing and replicating a ledger fragment that's still being
written to upon bookie failure.
default: 30
+ - param: lockReleaseOfFailedLedgerGracePeriod
+ description: the grace period so that if the replication worker fails to
replicate a underreplicatedledger for more than
ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
number of times, then instead of releasing the lock immediately after failed
attempt, it will hold under replicated ledger lock for this grace period and
then it will release the lock.
+ default: 60
- param: rwRereplicateBackoffMs
description: The time to backoff when replication worker encounters
exceptions on replicating a ledger, in milliseconds.
default: 5000
--
To stop receiving notification emails like this one, please contact
[email protected].