Repository: bookkeeper Updated Branches: refs/heads/master 9a8d62b1d -> 1a98088e3
BOOKKEEPER-594: AutoRecovery shutting down on SyncDisconnected Author: Matteo Merli <[email protected]> Reviewers: Sijie Guo <[email protected]> Closes #26 from merlimat/bk-594 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/1a98088e Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/1a98088e Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/1a98088e Branch: refs/heads/master Commit: 1a98088e38ca0a43c26d9a4847619b0a27bb90e8 Parents: 9a8d62b Author: Matteo Merli <[email protected]> Authored: Tue Mar 15 20:51:00 2016 -0700 Committer: Sijie Guo <[email protected]> Committed: Tue Mar 15 20:51:00 2016 -0700 ---------------------------------------------------------------------- .../replication/AutoRecoveryMain.java | 10 ++++++ .../replication/ReplicationWorker.java | 33 +++++++++++++++----- 2 files changed, 36 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/1a98088e/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java index 4a84b3c..3039470 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java @@ -109,6 +109,15 @@ public class AutoRecoveryMain { deathWatcher = new AutoRecoveryDeathWatcher(this); } + public AutoRecoveryMain(ServerConfiguration conf, ZooKeeper zk) throws IOException, InterruptedException, KeeperException, + UnavailableException, CompatibilityException { + this.conf = conf; + this.zk = zk; + auditorElector = new AuditorElector(Bookie.getBookieAddress(conf).toString(), conf, zk); + replicationWorker = new ReplicationWorker(zk, conf, Bookie.getBookieAddress(conf)); + deathWatcher = new AutoRecoveryDeathWatcher(this); + } + /* * Start daemons */ @@ -134,6 +143,7 @@ public class AutoRecoveryMain { } private void shutdown(int exitCode) { + LOG.info("Shutting down auto recovery: {}", exitCode); if (shuttingDown) { return; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/1a98088e/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 1a10667..e26d09a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -156,25 +156,42 @@ public class ReplicationWorker implements Runnable { try { rereplicate(); } catch (InterruptedException e) { - shutdown(); - Thread.currentThread().interrupt(); LOG.info("InterruptedException " + "while replicating fragments", e); + shutdown(); + Thread.currentThread().interrupt(); return; } catch (BKException e) { - shutdown(); LOG.error("BKException while replicating fragments", e); - return; + if (e instanceof BKException.BKWriteOnReadOnlyBookieException) { + waitTillTargetBookieIsWritable(); + } else { + waitBackOffTime(); + } } catch (UnavailableException e) { - shutdown(); LOG.error("UnavailableException " + "while replicating fragments", e); - return; + waitBackOffTime(); } } LOG.info("ReplicationWorker exited loop!"); } + private static void waitBackOffTime() { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + } + + private void waitTillTargetBookieIsWritable() { + LOG.info("Waiting for target bookie {} to be back in read/write mode", targetBookie); + while (admin.getReadOnlyBookies().contains(targetBookie)) { + waitBackOffTime(); + } + LOG.info("Target bookie {} is back in read/write mode", targetBookie); + } + /** * Replicates the under replicated fragments from failed bookie ledger to * targetBookie @@ -378,9 +395,9 @@ public class ReplicationWorker implements Runnable { underreplicationManager .releaseUnderreplicatedLedger(ledgerId); } catch (UnavailableException e) { - shutdown(); LOG.error("UnavailableException " + "while replicating fragments", e); + shutdown(); } } } @@ -393,6 +410,8 @@ public class ReplicationWorker implements Runnable { * Stop the replication worker service */ public void shutdown() { + LOG.info("Shutting down replication worker"); + synchronized (this) { if (!workerRunning) { return;
